From a43cce691dd3fe3bade2c32eed8143efb0de9458 Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Thu, 17 Oct 2019 11:58:26 +0530 Subject: [PATCH 01/14] refactor(fiber): remove FiberContext.of() method. Use FiberContext.evaluateWith() method --- src/internals/FiberContext.ts | 6 ------ src/runtimes/BaseRuntime.ts | 12 +++++------- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/src/internals/FiberContext.ts b/src/internals/FiberContext.ts index 960c0d40..db531270 100644 --- a/src/internals/FiberContext.ts +++ b/src/internals/FiberContext.ts @@ -56,12 +56,6 @@ export class FiberContext implements ICancellable, IFiber { return new FiberContext(scheduler, io.asInstruction) } - public static of( - scheduler: IScheduler, - p: FIO - ): FiberContext { - return new FiberContext(scheduler, p.asInstruction) - } private static dispatchResult( result: Option>, cb: CBOption diff --git a/src/runtimes/BaseRuntime.ts b/src/runtimes/BaseRuntime.ts index 8ee65255..8f820037 100644 --- a/src/runtimes/BaseRuntime.ts +++ b/src/runtimes/BaseRuntime.ts @@ -5,7 +5,6 @@ import {ICancellable, IScheduler} from 'ts-scheduler' import {CBOption} from '../internals/CBOption' import {FiberContext} from '../internals/FiberContext' -import {noop} from '../internals/Noop' import {FIO} from '../main/FIO' import {IRuntime} from './IRuntime' @@ -13,12 +12,11 @@ import {IRuntime} from './IRuntime' export abstract class BaseRuntime implements IRuntime { public abstract readonly scheduler: IScheduler - public unsafeExecute( - io: FIO, - cb: CBOption = noop - ): ICancellable { - const context = FiberContext.of(this.scheduler, io) - context.unsafeObserve(cb) + public unsafeExecute(io: FIO, cb?: CBOption): ICancellable { + const context = FiberContext.evaluateWith(io, this.scheduler) + if (cb !== undefined) { + context.unsafeObserve(cb) + } return context } From ec8ab130dc27c85b52e2852a9f26b3db078a0012 Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Thu, 17 Oct 2019 13:05:49 +0530 Subject: [PATCH 02/14] refactor(test): snapshot and take in any type of data now --- test/internals/Snapshot.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/internals/Snapshot.ts b/test/internals/Snapshot.ts index 5dc04ba3..a95ad1b7 100644 --- a/test/internals/Snapshot.ts +++ b/test/internals/Snapshot.ts @@ -4,9 +4,9 @@ import {FIO, UIO} from '../../src/main/FIO' import {IRuntimeEnv} from '../../src/runtimes/IRuntime' -export class Snapshot { +export class Snapshot { public readonly timeline = new Array() - public mark(value: string): FIO { + public mark(value: T): FIO { return FIO.runtime().chain(RTM => UIO( () => void this.timeline.push(value + '@' + RTM.scheduler.now()) From 46ae2bb0567a09c195b3c9ec75624691d295b172 Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Thu, 17 Oct 2019 21:28:41 +0530 Subject: [PATCH 03/14] feature(fiber): fiber can now automatically yield Fiber maintains a count of instructions that have been processed. After a given threshold, the fiber will yield and give time to other fibers to continue. --- src/internals/FiberContext.ts | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/src/internals/FiberContext.ts b/src/internals/FiberContext.ts index db531270..b1ddd182 100644 --- a/src/internals/FiberContext.ts +++ b/src/internals/FiberContext.ts @@ -53,7 +53,7 @@ export class FiberContext implements ICancellable, IFiber { io: IO, scheduler: IScheduler ): FiberContext { - return new FiberContext(scheduler, io.asInstruction) + return new FiberContext(io.asInstruction, scheduler) } private static dispatchResult( @@ -62,8 +62,9 @@ export class FiberContext implements ICancellable, IFiber { ): void { cb(result) } + private readonly cancellationList = new CancellationList() - private readonly node: LinkedListNode + private node?: LinkedListNode private readonly observers = DoublyLinkedList.of>() private result: Option> = Option.none() private readonly stackA = new Array() @@ -71,13 +72,12 @@ export class FiberContext implements ICancellable, IFiber { private status = FiberStatus.PENDING private constructor( + instruction: Instruction, private readonly scheduler: IScheduler, - instruction: Instruction + private readonly maxInstructionCount: number = Number.MAX_SAFE_INTEGER ) { this.stackA.push(instruction) - this.node = this.cancellationList.push( - this.scheduler.asap(this.unsafeEvaluate.bind(this)) - ) + this.init() } public cancel(): void { @@ -110,11 +110,25 @@ export class FiberContext implements ICancellable, IFiber { this.observers.map(_ => _(this.result)) } - private unsafeEvaluate(): void { - this.cancellationList.remove(this.node) - let data: unknown + private init(data?: unknown): void { + this.node = this.cancellationList.push( + this.scheduler.asap(this.unsafeEvaluate.bind(this), data) + ) + } + + private unsafeEvaluate(ddd?: unknown): void { + if (this.node !== undefined) { + this.cancellationList.remove(this.node) + this.node = undefined + } + let data: unknown = ddd + let count = 0 while (true) { + count++ + if (count === this.maxInstructionCount) { + return this.init(data) + } try { const j = this.stackA.pop() @@ -182,7 +196,7 @@ export class FiberContext implements ICancellable, IFiber { // A new context is created so that computation from that instruction can happen separately. // and then join back into the current context. // Using the same stack will corrupt it completely. - const nContext = new FiberContext(this.scheduler, j.i0) + const nContext = new FiberContext(j.i0, this.scheduler) this.cancellationList.push(nContext) data = nContext break From 3ac6dd34e804ec80f2a031bb624267b48ab7b34c Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Fri, 18 Oct 2019 08:04:21 +0530 Subject: [PATCH 04/14] refactor(test): update `Snapshot` default type params --- test/internals/Snapshot.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/internals/Snapshot.ts b/test/internals/Snapshot.ts index a95ad1b7..575c1ac1 100644 --- a/test/internals/Snapshot.ts +++ b/test/internals/Snapshot.ts @@ -4,7 +4,7 @@ import {FIO, UIO} from '../../src/main/FIO' import {IRuntimeEnv} from '../../src/runtimes/IRuntime' -export class Snapshot { +export class Snapshot { public readonly timeline = new Array() public mark(value: T): FIO { return FIO.runtime().chain(RTM => From 4be694d66be48fc0cfdd15b830c6811fc7e0176f Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Fri, 18 Oct 2019 15:05:54 +0530 Subject: [PATCH 05/14] feat(fiber): fiber now can switch between multiple contexts based on instruction count --- src/internals/FiberContext.ts | 9 ++-- src/main/FIO.ts | 7 ++- src/main/Instructions.ts | 1 + src/main/Queue.ts | 86 ++++++++++++++++++----------------- test/FiberContext.test.ts | 44 ++++++++++++++++++ 5 files changed, 100 insertions(+), 47 deletions(-) diff --git a/src/internals/FiberContext.ts b/src/internals/FiberContext.ts index b1ddd182..086caf67 100644 --- a/src/internals/FiberContext.ts +++ b/src/internals/FiberContext.ts @@ -51,9 +51,10 @@ export class FiberContext implements ICancellable, IFiber { */ public static evaluateWith( io: IO, - scheduler: IScheduler + scheduler: IScheduler, + maxInstructionCount: number = Number.MAX_SAFE_INTEGER ): FiberContext { - return new FiberContext(io.asInstruction, scheduler) + return new FiberContext(io.asInstruction, scheduler, maxInstructionCount) } private static dispatchResult( @@ -74,7 +75,7 @@ export class FiberContext implements ICancellable, IFiber { private constructor( instruction: Instruction, private readonly scheduler: IScheduler, - private readonly maxInstructionCount: number = Number.MAX_SAFE_INTEGER + private readonly maxInstructionCount: number ) { this.stackA.push(instruction) this.init() @@ -196,7 +197,7 @@ export class FiberContext implements ICancellable, IFiber { // A new context is created so that computation from that instruction can happen separately. // and then join back into the current context. // Using the same stack will corrupt it completely. - const nContext = new FiberContext(j.i0, this.scheduler) + const nContext = new FiberContext(j.i0, this.scheduler, j.i1) this.cancellationList.push(nContext) data = nContext break diff --git a/src/main/FIO.ts b/src/main/FIO.ts index 46a1b7a0..1da1f652 100644 --- a/src/main/FIO.ts +++ b/src/main/FIO.ts @@ -280,8 +280,11 @@ export class FIO { /** * Creates a new [[Fiber]] to run the given [[IO]]. */ - public static fork(io: IO): UIO> { - return new FIO(Tag.Fork, io) + public static fork( + io: IO, + maxInstructionCount: number = Number.MAX_SAFE_INTEGER + ): UIO> { + return new FIO(Tag.Fork, io, maxInstructionCount) } /** diff --git a/src/main/Instructions.ts b/src/main/Instructions.ts index d4d286fb..0268093f 100644 --- a/src/main/Instructions.ts +++ b/src/main/Instructions.ts @@ -69,6 +69,7 @@ export interface INever { } export interface IFork { i0: Instruction + i1: number // TODO: Instead of number pass runtime. tag: Tag.Fork } export interface IAccess { diff --git a/src/main/Queue.ts b/src/main/Queue.ts index adcbdbad..27e997b4 100644 --- a/src/main/Queue.ts +++ b/src/main/Queue.ts @@ -1,6 +1,4 @@ -import {LinkedListNode, List} from 'standard-data-structures' - -import {PureMutableList} from '../internals/PureMutableList' +import {DoublyLinkedList, List, Option} from 'standard-data-structures' import {Await} from './Await' import {FIO, NoEnv, UIO} from './FIO' @@ -14,41 +12,44 @@ export class Queue { * Returns the Queue as an array */ public get asArray(): UIO { - return this.Q.asArray + return UIO(() => this.Q.asArray) } /** * Returns the number of elements in the queue */ public get length(): UIO { - return this.Q.length + return UIO(() => this.Q.length) } /** * Pulls an item from the queue */ public get take(): UIO { - return this.Q.shift.chain(sz => - sz - .map(FIO.of) - .getOrElse( - FIO.flatten( - Await.of().chain( - FIO.encase(await => this.T.add(await).and(await.get)) - ) - ) + return FIO.flattenM(() => { + const sz = this.Q.shift() + + if (Option.isSome(sz)) { + return FIO.of(sz.value) + } + + return FIO.flatten( + Await.of().chain( + FIO.encase(await => { + this.T.add(await) + + return await.get + }) ) - ) + ) + }) } /** * Creates a new bounded Queue */ public static bounded(capacity: number): UIO> { - return PureMutableList.of().zipWith( - PureMutableList.of>(), - (Q, T) => new Queue(capacity, Q, T) - ) + return UIO(() => new Queue(capacity)) } /** @@ -58,24 +59,40 @@ export class Queue { return Queue.bounded(Number.MAX_SAFE_INTEGER) } - private constructor( - public readonly capacity: number, - private readonly Q: PureMutableList, - private readonly T: PureMutableList> - ) {} + private readonly Q = DoublyLinkedList.of() + private readonly T = DoublyLinkedList.of>() + private constructor(public readonly capacity: number) {} /** * Inserts an item into the queue */ - public offer(a: A): UIO> { - return this.Q.add(a).tap(_ => this.setAwaited(_.value)) + public offer(a: A): UIO { + return FIO.flattenM( + (): UIO => { + if (this.T.length === 0) { + this.Q.add(a) + + return FIO.void() + } + + const io = new Array>() + while (this.T.length !== 0) { + const item = this.T.shift() + if (Option.isSome(item)) { + io.push(item.value.set(FIO.of(a))) + } + } + + return FIO.seq(io).void + } + ) } /** * Adds all the provided items into the queue */ - public offerAll(...a: A[]): UIO>> { - return FIO.seq(a.map(_ => this.offer(_))) + public offerAll(...a: A[]): UIO { + return FIO.seq(a.map(_ => this.offer(_))).void } /** @@ -92,19 +109,6 @@ export class Queue { return itar(0, List.empty()).map(_ => _.asArray) } - private setAwaited(value: A): UIO { - const itar = (list: List>): UIO>> => - this.T.shift.chain(_ => - _.map(AWT => itar(list.prepend(AWT.set(FIO.of(value))))).getOrElse( - FIO.of(list) - ) - ) - - return itar(List.empty>()) - .tap(_ => (!_.isEmpty ? this.Q.shift : FIO.void())) - .chain(_ => FIO.seq(_.asArray)) - } - /** * Converts a queue into a [[FStream]] */ diff --git a/test/FiberContext.test.ts b/test/FiberContext.test.ts index ff7a86b1..7bd16867 100644 --- a/test/FiberContext.test.ts +++ b/test/FiberContext.test.ts @@ -4,6 +4,7 @@ import {testScheduler} from 'ts-scheduler/test' import {FiberContext} from '../src/internals/FiberContext' import {FIO} from '../src/main/FIO' +import {FStream} from '../src/main/FStream' import {testRuntime} from '../src/runtimes/TestRuntime' import {Counter} from './internals/Counter' @@ -196,4 +197,47 @@ describe('FiberContext', () => { it.skip('should return none') }) }) + + context('instruction count is reduced', () => { + it.skip('should cooperatively merge two streams', () => { + const list = new Array() + const insert = FIO.encase((_: number) => void list.push(_)) + + const scheduler = testScheduler() + FiberContext.evaluateWith( + FStream.range(101, 103) + .merge(FStream.range(901, 903)) + .mapM(insert).drain, + scheduler, + 5 + ) + + scheduler.run() + + const expected = [901, 101, 102, 103, 902, 903] + assert.deepStrictEqual(list, expected) + }) + + it('should switch between multiple contexts', () => { + const MAX_INSTRUCTION_COUNT = 5 + const scheduler = testScheduler() + const actual = new Array() + const insert = FIO.encase((_: number) => void actual.push(_)) + const longIO = FIO.of(1) + .and(FIO.of(2)) + .and(FIO.of(3)) + .and(FIO.of(4)) + .and(FIO.of(5)) + .chain(insert) + const shortIO = FIO.of(1000).chain(insert) + + FiberContext.evaluateWith(longIO, scheduler, MAX_INSTRUCTION_COUNT) + FiberContext.evaluateWith(shortIO, scheduler, 5) + + scheduler.run() + + const expected = [1000, 5] + assert.deepStrictEqual(actual, expected) + }) + }) }) From 8c8f85f0e7f7b895b9420707dd5ce48398169636 Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Fri, 18 Oct 2019 16:17:00 +0530 Subject: [PATCH 06/14] refactor(fiber): change fiber file structure - move IFiber inside FiberContext code - rename FiberContext to Fiber --- index.ts | 2 +- src/internals/{FiberContext.ts => Fiber.ts} | 15 ++++++++++++++- src/main/FIO.ts | 2 +- src/main/IFiber.ts | 17 ----------------- src/runtimes/BaseRuntime.ts | 2 +- test/FIO.test.ts | 2 +- test/FiberContext.test.ts | 2 +- 7 files changed, 19 insertions(+), 23 deletions(-) rename src/internals/{FiberContext.ts => Fiber.ts} (93%) delete mode 100644 src/main/IFiber.ts diff --git a/index.ts b/index.ts index 2bad55bb..6de8a183 100644 --- a/index.ts +++ b/index.ts @@ -3,7 +3,7 @@ export {Await} from './src/main/Await' export {defaultRuntime} from './src/runtimes/DefaultRuntime' export {FIO, IO, Task, TaskR, UIO} from './src/main/FIO' export {FStream, Stream} from './src/main/FStream' -export {IFiber} from './src/main/IFiber' +export {IFiber} from './src/internals/Fiber' export {IRuntime} from './src/runtimes/IRuntime' export {Managed} from './src/main/Managed' export {Queue} from './src/main/Queue' diff --git a/src/internals/FiberContext.ts b/src/internals/Fiber.ts similarity index 93% rename from src/internals/FiberContext.ts rename to src/internals/Fiber.ts index 086caf67..a7c150c0 100644 --- a/src/internals/FiberContext.ts +++ b/src/internals/Fiber.ts @@ -10,7 +10,6 @@ import { import {ICancellable, IScheduler} from 'ts-scheduler' import {FIO, IO, UIO} from '../main/FIO' -import {IFiber} from '../main/IFiber' import {Instruction, Tag} from '../main/Instructions' import {CancellationList} from './CancellationList' @@ -27,6 +26,20 @@ enum FiberStatus { CANCELLED } +/** + * Fibers are data structures that provide you a handle to control the execution of its `IO`. + * They can be created by calling the [[FIO.fork]] method. + * Fiber created is always going to be in a `Paused` state. To resume the fiber, you should call the `resume` or the `resumeAsync` methods. + * @typeparam E Exceptions that can be thrown + * @typeparam A The success value + */ +export interface IFiber { + abort: UIO + await: UIO>> + join: FIO + release(p: UIO): UIO +} + /** * FiberContext actually evaluates the FIO expression. * Its creation is effectful. diff --git a/src/main/FIO.ts b/src/main/FIO.ts index 1da1f652..e98aa4c8 100644 --- a/src/main/FIO.ts +++ b/src/main/FIO.ts @@ -6,11 +6,11 @@ import {Either, List, Option} from 'standard-data-structures' import {ICancellable} from 'ts-scheduler' import {CB} from '../internals/CB' +import {IFiber} from '../internals/Fiber' import {Id} from '../internals/Id' import {IRuntime, IRuntimeEnv} from '../runtimes/IRuntime' import {Await} from './Await' -import {IFiber} from './IFiber' import {Instruction, Tag} from './Instructions' export type NoEnv = unknown diff --git a/src/main/IFiber.ts b/src/main/IFiber.ts deleted file mode 100644 index 19eae6d7..00000000 --- a/src/main/IFiber.ts +++ /dev/null @@ -1,17 +0,0 @@ -import {Either, Option} from 'standard-data-structures' - -import {FIO, UIO} from './FIO' - -/** - * Fibers are data structures that provide you a handle to control the execution of its `IO`. - * They can be created by calling the [[FIO.fork]] method. - * Fiber created is always going to be in a `Paused` state. To resume the fiber, you should call the `resume` or the `resumeAsync` methods. - * @typeparam E Exceptions that can be thrown - * @typeparam A The success value - */ -export interface IFiber { - abort: UIO - await: UIO>> - join: FIO - release(p: UIO): UIO -} diff --git a/src/runtimes/BaseRuntime.ts b/src/runtimes/BaseRuntime.ts index 8f820037..21d10d9f 100644 --- a/src/runtimes/BaseRuntime.ts +++ b/src/runtimes/BaseRuntime.ts @@ -4,7 +4,7 @@ import {ICancellable, IScheduler} from 'ts-scheduler' import {CBOption} from '../internals/CBOption' -import {FiberContext} from '../internals/FiberContext' +import {FiberContext} from '../internals/Fiber' import {FIO} from '../main/FIO' import {IRuntime} from './IRuntime' diff --git a/test/FIO.test.ts b/test/FIO.test.ts index 8aecea2b..58734e7b 100644 --- a/test/FIO.test.ts +++ b/test/FIO.test.ts @@ -5,7 +5,7 @@ import {assert, spy} from 'chai' import {Either} from 'standard-data-structures' -import {FiberContext} from '../src/internals/FiberContext' +import {FiberContext} from '../src/internals/Fiber' import {FIO, UIO} from '../src/main/FIO' import {defaultRuntime} from '../src/runtimes/DefaultRuntime' import {IRuntime} from '../src/runtimes/IRuntime' diff --git a/test/FiberContext.test.ts b/test/FiberContext.test.ts index 7bd16867..7b6f97fd 100644 --- a/test/FiberContext.test.ts +++ b/test/FiberContext.test.ts @@ -2,7 +2,7 @@ import {assert, spy} from 'chai' import {Either, Option} from 'standard-data-structures' import {testScheduler} from 'ts-scheduler/test' -import {FiberContext} from '../src/internals/FiberContext' +import {FiberContext} from '../src/internals/Fiber' import {FIO} from '../src/main/FIO' import {FStream} from '../src/main/FStream' import {testRuntime} from '../src/runtimes/TestRuntime' From 7f02aafcf57621b42aee77e23dfca79d2a14afba Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Fri, 18 Oct 2019 19:56:13 +0530 Subject: [PATCH 07/14] refactor(fiber): change access to runtime --- example/guess-the-number/src/Run.ts | 3 +- example/guess-the-number/test/Program.test.ts | 20 ++-- index.ts | 2 +- src/internals/Fiber.ts | 68 ++++++++--- src/main/FIO.ts | 51 ++++---- src/main/FStream.ts | 5 +- src/main/Instructions.ts | 8 +- src/prototypes/FServer.ts | 9 +- src/runtimes/BaseRuntime.ts | 12 +- src/runtimes/DefaultRuntime.ts | 9 +- src/runtimes/IRuntime.ts | 8 +- src/runtimes/TestRuntime.ts | 10 +- test/Await.test.ts | 13 +- test/FIO.test.ts | 99 ++++++---------- test/FStream.test.ts | 1 - test/FiberContext.test.ts | 111 ++++++++---------- test/Managed.test.ts | 2 - test/Queue.test.ts | 12 +- test/internals/Snapshot.ts | 3 +- 19 files changed, 204 insertions(+), 242 deletions(-) diff --git a/example/guess-the-number/src/Run.ts b/example/guess-the-number/src/Run.ts index c47fc5fb..51200b1d 100644 --- a/example/guess-the-number/src/Run.ts +++ b/example/guess-the-number/src/Run.ts @@ -18,9 +18,8 @@ const runtime = defaultRuntime() runtime.unsafeExecute( program.provide({ math: Math, - runtime, tty: { - getStrLn: FIO.pipeEnv(getStrLn, {process, readline, runtime}), + getStrLn: FIO.pipeEnv(getStrLn, {process, readline}), putStrLn: FIO.pipeEnv(putStrLn, {console}) } }) diff --git a/example/guess-the-number/test/Program.test.ts b/example/guess-the-number/test/Program.test.ts index 5990cb45..60e21e3c 100644 --- a/example/guess-the-number/test/Program.test.ts +++ b/example/guess-the-number/test/Program.test.ts @@ -46,7 +46,7 @@ describe('Program', () => { const math = MockMath() const tty = MockTTY({}) const runtime = testRuntime() - runtime.unsafeExecuteSync(program.provide({math, tty, runtime})) + runtime.unsafeExecuteSync(program.provide({math, tty})) assert.deepStrictEqual(tty.stdout, ['Greetings!', 'Enter your name: ']) }) @@ -57,7 +57,7 @@ describe('Program', () => { 'Enter your name: ': ['John'] }) const runtime = testRuntime() - runtime.unsafeExecuteSync(program.provide({math, tty, runtime})) + runtime.unsafeExecuteSync(program.provide({math, tty})) assert.deepStrictEqual(tty.stdout, [ 'Greetings!', @@ -74,7 +74,7 @@ describe('Program', () => { 'Enter your name: ': ['John'] }) const runtime = testRuntime() - runtime.unsafeExecuteSync(program.provide({math, tty, runtime})) + runtime.unsafeExecuteSync(program.provide({math, tty})) assert.deepStrictEqual(tty.stdout, [ 'Greetings!', @@ -94,7 +94,7 @@ describe('Program', () => { 'Enter your name: ': ['John'] }) const runtime = testRuntime() - runtime.unsafeExecuteSync(program.provide({math, tty, runtime})) + runtime.unsafeExecuteSync(program.provide({math, tty})) assert.deepStrictEqual(tty.stdout, [ 'Greetings!', @@ -115,7 +115,7 @@ describe('Program', () => { 'Press ⏎ to continue (or will exit in 3sec): ': ['', ''] }) const runtime = testRuntime() - runtime.unsafeExecuteSync(program.provide({math, tty, runtime})) + runtime.unsafeExecuteSync(program.provide({math, tty})) assert.deepStrictEqual(tty.stdout, [ 'Greetings!', @@ -138,9 +138,7 @@ describe('Program', () => { 'Press ⏎ to continue (or will exit in 3sec): ': [''] }) const runtime = testRuntime() - const actual = runtime.unsafeExecuteSync( - canContinue.provide({tty, runtime}) - ) + const actual = runtime.unsafeExecuteSync(canContinue.provide({tty})) assert.isTrue(actual) }) @@ -149,16 +147,14 @@ describe('Program', () => { it('should return false', () => { const tty = MockTTY({}) const runtime = testRuntime() - const actual = runtime.unsafeExecuteSync( - canContinue.provide({tty, runtime}) - ) + const actual = runtime.unsafeExecuteSync(canContinue.provide({tty})) assert.isFalse(actual) }) it('should output goodbye', () => { const tty = MockTTY({}) const runtime = testRuntime() - runtime.unsafeExecuteSync(canContinue.provide({tty, runtime})) + runtime.unsafeExecuteSync(canContinue.provide({tty})) assert.deepStrictEqual(tty.stdout, [ 'Press ⏎ to continue (or will exit in 3sec): ', diff --git a/index.ts b/index.ts index 6de8a183..cea7b139 100644 --- a/index.ts +++ b/index.ts @@ -3,7 +3,7 @@ export {Await} from './src/main/Await' export {defaultRuntime} from './src/runtimes/DefaultRuntime' export {FIO, IO, Task, TaskR, UIO} from './src/main/FIO' export {FStream, Stream} from './src/main/FStream' -export {IFiber} from './src/internals/Fiber' +export {Fiber} from './src/internals/Fiber' export {IRuntime} from './src/runtimes/IRuntime' export {Managed} from './src/main/Managed' export {Queue} from './src/main/Queue' diff --git a/src/internals/Fiber.ts b/src/internals/Fiber.ts index a7c150c0..e403d3f6 100644 --- a/src/internals/Fiber.ts +++ b/src/internals/Fiber.ts @@ -7,10 +7,11 @@ import { LinkedListNode, Option } from 'standard-data-structures' -import {ICancellable, IScheduler} from 'ts-scheduler' +import {ICancellable} from 'ts-scheduler' import {FIO, IO, UIO} from '../main/FIO' import {Instruction, Tag} from '../main/Instructions' +import {IRuntime} from '../runtimes/IRuntime' import {CancellationList} from './CancellationList' import {CBOption} from './CBOption' @@ -33,11 +34,23 @@ enum FiberStatus { * @typeparam E Exceptions that can be thrown * @typeparam A The success value */ -export interface IFiber { - abort: UIO - await: UIO>> - join: FIO - release(p: UIO): UIO +export abstract class Fiber { + /** + * Executes the provided [[FIO]] expression. + * Returns a [[ICancellable]] that can be used to interrupt the execution. + */ + public static unsafeExecute( + io: IO, + runtime: IRuntime, + cb?: CBOption + ): ICancellable { + return FiberContext.unsafeExecute(io, runtime, cb) + } + public abstract abort: UIO + public abstract await: UIO>> + public abstract join: FIO + public abstract runtime: IRuntime + public abstract release(p: UIO): UIO } /** @@ -46,7 +59,7 @@ export interface IFiber { * As soon as its created it starts to evaluate the FIO expression. * It provides public APIs to [[Fiber]] to consume. */ -export class FiberContext implements ICancellable, IFiber { +export class FiberContext extends Fiber implements ICancellable { public get abort(): UIO { return UIO(() => this.cancel()) } @@ -62,12 +75,17 @@ export class FiberContext implements ICancellable, IFiber { /** * Evaluates an IO using the provided scheduler */ - public static evaluateWith( + public static unsafeExecute( io: IO, - scheduler: IScheduler, - maxInstructionCount: number = Number.MAX_SAFE_INTEGER + runtime: IRuntime, + cb?: CBOption ): FiberContext { - return new FiberContext(io.asInstruction, scheduler, maxInstructionCount) + const context = new FiberContext(io.asInstruction, runtime) + if (cb !== undefined) { + context.unsafeObserve(cb) + } + + return context } private static dispatchResult( @@ -87,9 +105,9 @@ export class FiberContext implements ICancellable, IFiber { private constructor( instruction: Instruction, - private readonly scheduler: IScheduler, - private readonly maxInstructionCount: number + public readonly runtime: IRuntime ) { + super() this.stackA.push(instruction) this.init() } @@ -107,10 +125,18 @@ export class FiberContext implements ICancellable, IFiber { public unsafeObserve(cb: CBOption): ICancellable { if (this.status === FiberStatus.CANCELLED) { - return this.scheduler.asap(FiberContext.dispatchResult, Option.none(), cb) + return this.runtime.scheduler.asap( + FiberContext.dispatchResult, + Option.none(), + cb + ) } if (this.status === FiberStatus.COMPLETED) { - return this.scheduler.asap(FiberContext.dispatchResult, this.result, cb) + return this.runtime.scheduler.asap( + FiberContext.dispatchResult, + this.result, + cb + ) } const node = this.observers.add(cb) @@ -126,7 +152,7 @@ export class FiberContext implements ICancellable, IFiber { private init(data?: unknown): void { this.node = this.cancellationList.push( - this.scheduler.asap(this.unsafeEvaluate.bind(this), data) + this.runtime.scheduler.asap(this.unsafeEvaluate.bind(this), data) ) } @@ -140,7 +166,7 @@ export class FiberContext implements ICancellable, IFiber { let count = 0 while (true) { count++ - if (count === this.maxInstructionCount) { + if (count === this.runtime.maxInstructionCount) { return this.init(data) } try { @@ -159,6 +185,10 @@ export class FiberContext implements ICancellable, IFiber { this.stackA.push(j.i0(...j.i1)) break + case Tag.Runtime: + data = this.runtime + break + case Tag.Reject: while ( this.stackA.length > 0 && @@ -210,7 +240,7 @@ export class FiberContext implements ICancellable, IFiber { // A new context is created so that computation from that instruction can happen separately. // and then join back into the current context. // Using the same stack will corrupt it completely. - const nContext = new FiberContext(j.i0, this.scheduler, j.i1) + const nContext = new FiberContext(j.i0, j.i1) this.cancellationList.push(nContext) data = nContext break @@ -263,7 +293,7 @@ export class FiberContext implements ICancellable, IFiber { private unsafeRelease(p: UIO): void { this.cancellationList.push({ - cancel: () => FiberContext.evaluateWith(p, this.scheduler) + cancel: () => Fiber.unsafeExecute(p, this.runtime) }) } } diff --git a/src/main/FIO.ts b/src/main/FIO.ts index e98aa4c8..bcbaa1f4 100644 --- a/src/main/FIO.ts +++ b/src/main/FIO.ts @@ -6,9 +6,9 @@ import {Either, List, Option} from 'standard-data-structures' import {ICancellable} from 'ts-scheduler' import {CB} from '../internals/CB' -import {IFiber} from '../internals/Fiber' +import {Fiber} from '../internals/Fiber' import {Id} from '../internals/Id' -import {IRuntime, IRuntimeEnv} from '../runtimes/IRuntime' +import {IRuntime} from '../runtimes/IRuntime' import {Await} from './Await' import {Instruction, Tag} from './Instructions' @@ -78,8 +78,10 @@ export class FIO { /** * Returns a [[Fiber]]. The returned fiber is always in a paused state. */ - public get fork(): FIO, R1> { - return FIO.env().chain(env => FIO.fork(this.provide(env))) + public get fork(): FIO, R1> { + return FIO.env().zipWithM(FIO.runtime(), (ENV, RTM) => + FIO.fork(this.provide(ENV), RTM) + ) } /** @@ -119,7 +121,7 @@ export class FIO { */ public static accessP( cb: (R: R1) => Promise - ): FIO { + ): FIO { return FIO.env().chain(FIO.encaseP(cb)) } @@ -192,9 +194,7 @@ export class FIO { /** * Creates a [[FIO]] using a callback function. */ - public static cb( - fn: (cb: (A1: A1) => void) => void - ): FIO { + public static cb(fn: (cb: (A1: A1) => void) => void): UIO { return FIO.runtime().chain(RTM => FIO.asyncUIO(res => RTM.scheduler.asap(fn, res)) ) @@ -224,7 +224,7 @@ export class FIO { */ public static encaseP( cb: (...t: T) => Promise - ): (...t: T) => FIO { + ): (...t: T) => FIO { return (...t) => FIO.runtime().chain(RTM => FIO.asyncIO((rej, res) => @@ -282,9 +282,9 @@ export class FIO { */ public static fork( io: IO, - maxInstructionCount: number = Number.MAX_SAFE_INTEGER - ): UIO> { - return new FIO(Tag.Fork, io, maxInstructionCount) + runtime: IRuntime + ): UIO> { + return new FIO(Tag.Fork, io, runtime) } /** @@ -348,12 +348,12 @@ export class FIO { */ public static node( fn: (cb: NodeJSCallback) => void - ): FIO { + ): FIO { return FIO.runtime().chain(RTM => - FIO.asyncIO((rej, res) => + FIO.asyncIO((rej, res) => RTM.scheduler.asap(() => { try { - fn((err, result) => (err === null ? res(result as A) : rej(err))) + fn((err, result) => (err === null ? res(result) : rej(err))) } catch (e) { rej(e as NodeJS.ErrnoException) } @@ -438,8 +438,8 @@ export class FIO { /** * Returns the current runtime in a pure way. */ - public static runtime(): FIO { - return FIO.access((_: IRuntimeEnv) => _.runtime) + public static runtime(): UIO { + return new FIO(Tag.Runtime) } /** @@ -460,12 +460,9 @@ export class FIO { /** * Resolves with the provided value after the given time */ - public static timeout( - value: A, - duration: number - ): FIO { + public static timeout(value: A, duration: number): FIO { return FIO.runtime().chain(RTM => - FIO.asyncIO((rej, res) => RTM.scheduler.delay(res, duration, value)) + FIO.asyncUIO(res => RTM.scheduler.delay(res, duration, value)) ) } @@ -479,7 +476,7 @@ export class FIO { /** * Tries to run an function that returns a promise. */ - public static tryP(cb: () => Promise): TaskR { + public static tryP(cb: () => Promise): TaskR { return FIO.encaseP(cb)() } @@ -489,7 +486,7 @@ export class FIO { */ public static uninterruptibleIO( fn: (rej: CB, res: CB) => unknown - ): FIO { + ): IO { return FIO.runtime().chain(RTM => FIO.asyncIO((rej, res) => RTM.scheduler.asap(fn, rej, res)) ) @@ -570,7 +567,7 @@ export class FIO { /** * Delays the execution of the [[FIO]] by the provided time. */ - public delay(duration: number): FIO { + public delay(duration: number): FIO { return FIO.timeout(this, duration).chain(Id) } @@ -652,8 +649,8 @@ export class FIO { */ public raceWith( that: FIO, - cb1: (exit: Either, fiber: IFiber) => IO, - cb2: (exit: Either, fiber: IFiber) => IO + cb1: (exit: Either, fiber: Fiber) => IO, + cb2: (exit: Either, fiber: Fiber) => IO ): FIO { return Await.of().chain(done => this.fork.zip(that.fork).chain(({0: f1, 1: f2}) => { diff --git a/src/main/FStream.ts b/src/main/FStream.ts index c4b1d50c..c616b38a 100644 --- a/src/main/FStream.ts +++ b/src/main/FStream.ts @@ -2,7 +2,6 @@ import {EventEmitter} from 'events' import {List} from 'standard-data-structures' import {T} from '../internals/T' -import {IRuntimeEnv} from '../runtimes/IRuntime' import {FIO, NoEnv, UIO} from './FIO' import {Managed, UManaged} from './Managed' @@ -88,7 +87,7 @@ export class FStream { public static fromEventEmitter( ev: EventEmitter, name: string - ): FIO>, IRuntimeEnv> { + ): FIO>> { return FIO.runtime().zipWith(Queue.bounded(1), (RTM, Q) => { const onEvent = (a: A) => RTM.unsafeExecute(Q.offer(a)) @@ -127,7 +126,7 @@ export class FStream { public static interval( A1: A1, duration: number - ): FStream { + ): FStream { return FStream.produce(FIO.timeout(A1, duration)) } diff --git a/src/main/Instructions.ts b/src/main/Instructions.ts index 0268093f..5a861609 100644 --- a/src/main/Instructions.ts +++ b/src/main/Instructions.ts @@ -1,6 +1,7 @@ import {ICancellable} from 'ts-scheduler' import {CB} from '../internals/CB' +import {IRuntime} from '../runtimes/IRuntime' /** * @ignore @@ -69,7 +70,7 @@ export interface INever { } export interface IFork { i0: Instruction - i1: number // TODO: Instead of number pass runtime. + i1: IRuntime tag: Tag.Fork } export interface IAccess { @@ -87,6 +88,10 @@ export interface ICapture { i0(i: A): Instruction } +export interface IRTime { + tag: Tag.Runtime +} + /** * @ignore */ @@ -103,5 +108,6 @@ export type Instruction = | INever | IProvide | IReject + | IRTime | ITry | ITryM diff --git a/src/prototypes/FServer.ts b/src/prototypes/FServer.ts index 5052c191..818e7980 100644 --- a/src/prototypes/FServer.ts +++ b/src/prototypes/FServer.ts @@ -1,7 +1,6 @@ import * as http from 'http' import {defaultRuntime, FIO, IRuntime, Managed, UIO} from '../..' -import {IRuntimeEnv} from '../runtimes/IRuntime' const Exit = FIO.encase((message: Error) => { process.exit(1) @@ -13,7 +12,7 @@ interface IFIOServerOptions { } class FIOServerBuilder { - public get serve(): FIO { + public get serve(): UIO { return Managed.make( FIO.runtime().encase(RTM => new FIOServer(RTM, this.options)), server => server.close @@ -59,7 +58,7 @@ class FIOServer { } } - public get close(): FIO { + public get close(): UIO { return FIO.uninterruptibleIO((rej, res) => () => this.server.close(E => (E !== undefined ? rej(E) : res())) ).catch(Exit) @@ -68,7 +67,5 @@ class FIOServer { const runtime = defaultRuntime() runtime.unsafeExecute( - FIOServerBuilder.of() - .mount('/greet', () => FIO.of('Hello World!')) - .serve.provide({runtime}) + FIOServerBuilder.of().mount('/greet', () => FIO.of('Hello World!')).serve ) diff --git a/src/runtimes/BaseRuntime.ts b/src/runtimes/BaseRuntime.ts index 21d10d9f..ffc7c589 100644 --- a/src/runtimes/BaseRuntime.ts +++ b/src/runtimes/BaseRuntime.ts @@ -4,20 +4,18 @@ import {ICancellable, IScheduler} from 'ts-scheduler' import {CBOption} from '../internals/CBOption' -import {FiberContext} from '../internals/Fiber' +import {Fiber} from '../internals/Fiber' import {FIO} from '../main/FIO' import {IRuntime} from './IRuntime' export abstract class BaseRuntime implements IRuntime { public abstract readonly scheduler: IScheduler + public constructor( + public readonly maxInstructionCount: number = Number.MAX_SAFE_INTEGER + ) {} public unsafeExecute(io: FIO, cb?: CBOption): ICancellable { - const context = FiberContext.evaluateWith(io, this.scheduler) - if (cb !== undefined) { - context.unsafeObserve(cb) - } - - return context + return Fiber.unsafeExecute(io, this, cb) } } diff --git a/src/runtimes/DefaultRuntime.ts b/src/runtimes/DefaultRuntime.ts index 1c103ac4..9fcd3f5e 100644 --- a/src/runtimes/DefaultRuntime.ts +++ b/src/runtimes/DefaultRuntime.ts @@ -4,8 +4,14 @@ import {FIO} from '../main/FIO' import {BaseRuntime} from './BaseRuntime' +interface IDefaultRuntimeOptions { + maxInstructionCount: number +} export class DefaultRuntime extends BaseRuntime { public scheduler = scheduler + public constructor(maxInstructionCount?: number) { + super(maxInstructionCount) + } public async unsafeExecutePromise(io: FIO): Promise { return new Promise((res, rej) => { @@ -14,4 +20,5 @@ export class DefaultRuntime extends BaseRuntime { } } -export const defaultRuntime = () => new DefaultRuntime() +export const defaultRuntime = (O: Partial = {}) => + new DefaultRuntime(O.maxInstructionCount) diff --git a/src/runtimes/IRuntime.ts b/src/runtimes/IRuntime.ts index a54506e5..7aa3a864 100644 --- a/src/runtimes/IRuntime.ts +++ b/src/runtimes/IRuntime.ts @@ -14,6 +14,7 @@ import {FIO} from '../main/FIO' * Actual implementation is available at [[DefaultRuntime]] & [[TestRuntime]]. */ export interface IRuntime { + maxInstructionCount: number scheduler: IScheduler /** @@ -22,10 +23,3 @@ export interface IRuntime { */ unsafeExecute(io: FIO, cb?: CBOption): ICancellable } - -/** - * Env needed to get access to the current [[IRuntime]]. - */ -export interface IRuntimeEnv { - runtime: IRuntime -} diff --git a/src/runtimes/TestRuntime.ts b/src/runtimes/TestRuntime.ts index 1273c68e..155ef1fa 100644 --- a/src/runtimes/TestRuntime.ts +++ b/src/runtimes/TestRuntime.ts @@ -11,10 +11,13 @@ import {FIO, IO} from '../main/FIO' import {BaseRuntime} from './BaseRuntime' +type TestRuntimeOptions = Partial< + ITestSchedulerOptions & {maxInstructionCount: number} +> export class TestRuntime extends BaseRuntime { public readonly scheduler: TestScheduler - public constructor(options?: Partial) { - super() + public constructor(options: TestRuntimeOptions) { + super(options.maxInstructionCount) this.scheduler = testScheduler(options) } @@ -33,5 +36,4 @@ export class TestRuntime extends BaseRuntime { } } -export const testRuntime = (O?: Partial) => - new TestRuntime(O) +export const testRuntime = (O: TestRuntimeOptions = {}) => new TestRuntime(O) diff --git a/test/Await.test.ts b/test/Await.test.ts index 301e37d7..9875cff3 100644 --- a/test/Await.test.ts +++ b/test/Await.test.ts @@ -64,14 +64,7 @@ describe('Await', () => { const counter = new Counter() // Create an IO that takes a second to run - runtime.unsafeExecuteSync( - AWT.set( - counter - .inc() - .delay(1000) - .provide({runtime}) - ) - ) + runtime.unsafeExecuteSync(AWT.set(counter.inc().delay(1000))) // Run till 500 (half time for the original IO runtime.scheduler.runTo(500) @@ -123,9 +116,7 @@ describe('Await', () => { ) as Await const res = spy() runtime.unsafeExecute(await.get, res) - runtime.unsafeExecute( - await.set(FIO.timeout('Hey', 1000).provide({runtime})) - ) + runtime.unsafeExecute(await.set(FIO.timeout('Hey', 1000))) res.should.not.be.called() runtime.scheduler.run() diff --git a/test/FIO.test.ts b/test/FIO.test.ts index 58734e7b..e6b9259c 100644 --- a/test/FIO.test.ts +++ b/test/FIO.test.ts @@ -8,7 +8,6 @@ import {Either} from 'standard-data-structures' import {FiberContext} from '../src/internals/Fiber' import {FIO, UIO} from '../src/main/FIO' import {defaultRuntime} from '../src/runtimes/DefaultRuntime' -import {IRuntime} from '../src/runtimes/IRuntime' import {testRuntime} from '../src/runtimes/TestRuntime' import {Counter} from './internals/Counter' @@ -161,16 +160,14 @@ describe('FIO', () => { describe('timeout', () => { it('should emit the provided value', () => { const runtime = testRuntime() - const actual = runtime.unsafeExecuteSync( - FIO.timeout('Happy', 100).provide({runtime}) - ) + const actual = runtime.unsafeExecuteSync(FIO.timeout('Happy', 100)) const expected = 'Happy' assert.strictEqual(actual, expected) }) it('should emit after the provided duration', () => { const runtime = testRuntime() - runtime.unsafeExecuteSync(FIO.timeout('Happy', 100).provide({runtime})) + runtime.unsafeExecuteSync(FIO.timeout('Happy', 100)) const actual = runtime.scheduler.now() const expected = 101 assert.strictEqual(actual, expected) @@ -182,9 +179,7 @@ describe('FIO', () => { let executedAt = -1 const runtime = testRuntime() runtime.unsafeExecuteSync( - FIO.try(() => (executedAt = runtime.scheduler.now())) - .delay(1000) - .provide({runtime}) + FIO.try(() => (executedAt = runtime.scheduler.now())).delay(1000) ) const expected = 1001 @@ -192,7 +187,7 @@ describe('FIO', () => { }) it('should emit after the provided duration', () => { const runtime = testRuntime() - runtime.unsafeExecuteSync(FIO.timeout('Happy', 100).provide({runtime})) + runtime.unsafeExecuteSync(FIO.timeout('Happy', 100)) const actual = runtime.scheduler.now() const expected = 101 assert.strictEqual(actual, expected) @@ -202,9 +197,7 @@ describe('FIO', () => { let executed = false const runtime = testRuntime() const cancellable = runtime.unsafeExecute( - FIO.try(() => (executed = true)) - .delay(100) - .provide({runtime}) + FIO.try(() => (executed = true)).delay(100) ) runtime.scheduler.runTo(50) assert.notOk(executed) @@ -229,10 +222,7 @@ describe('FIO', () => { it('should resolve the encased function', async () => { const runtime = defaultRuntime() const actual = await runtime.unsafeExecutePromise( - FIO.encaseP((a: number, b: number) => Promise.resolve(a + b))( - 1, - 1000 - ).provide({runtime}) + FIO.encaseP((a: number, b: number) => Promise.resolve(a + b))(1, 1000) ) const expected = 1001 assert.strictEqual(actual, expected) @@ -261,9 +251,9 @@ describe('FIO', () => { it('should capture async exceptions', () => { const runtime = testRuntime() const actual = runtime.unsafeExecuteSync( - FIO.uninterruptibleIO(rej => rej(new Error('Bye'))) - .catch(err => FIO.of(err.message)) - .provide({runtime}) + FIO.uninterruptibleIO(rej => rej(new Error('Bye'))).catch(err => + FIO.of(err.message) + ) ) const expected = 'Bye' assert.strictEqual(actual, expected) @@ -277,7 +267,6 @@ describe('FIO', () => { .catch(err => FIO.reject(new Error(err.message + 'C'))) .catch(err => FIO.reject(new Error(err.message + 'D'))) .catch(err => FIO.of(err.message + 'E')) - .provide({runtime}) ) const expected = 'ABCDE' @@ -314,12 +303,7 @@ describe('FIO', () => { it('should handle concurrent set', () => { const counter = new Counter() const runtime = testRuntime() - const memoized = runtime.unsafeExecuteSync( - counter - .inc() - .delay(100) - .once.provide({runtime}) - ) + const memoized = runtime.unsafeExecuteSync(counter.inc().delay(100).once) // Schedule first run at 10ms runtime.scheduler.runTo(10) @@ -345,7 +329,6 @@ describe('FIO', () => { .inc() .delay(100) .once.chain(_ => _.and(_)) - .provide({runtime}) ) const actual = counter.count @@ -364,11 +347,7 @@ describe('FIO', () => { it('should complete immediately', () => { const runtime = testRuntime() const counter = new Counter() - runtime.unsafeExecute( - FIO.timeout('A', 1000) - .fork.and(counter.inc()) - .provide({runtime}) - ) + runtime.unsafeExecute(FIO.timeout('A', 1000).fork.and(counter.inc())) runtime.scheduler.runTo(10) assert.isTrue(counter.increased) }) @@ -403,7 +382,6 @@ describe('FIO', () => { .inc() .delay(1000) .fork.chain(fiber => fiber.join.delay(100)) - .provide({runtime}) ) const expected = 1 @@ -418,7 +396,6 @@ describe('FIO', () => { FIO.of(10) .delay(100) .fork.chain(fib => fib.join.and(counter.inc())) - .provide({runtime}) ) runtime.scheduler.runTo(50) @@ -496,9 +473,7 @@ describe('FIO', () => { const left = FIO.of(10).delay(1500) const right = FIO.of(20).delay(1000) const runtime = testRuntime() - runtime.unsafeExecuteSync( - left.zipWithPar(right, (a, b) => [a, b]).provide({runtime}) - ) + runtime.unsafeExecuteSync(left.zipWithPar(right, (a, b) => [a, b])) const actual = runtime.scheduler.now() assert.strictEqual(actual, 1501) @@ -510,7 +485,7 @@ describe('FIO', () => { const runtime = testRuntime() const actual = runtime.unsafeExecuteSync( - left.zipWithPar(right, (a, b) => [a, b]).provide({runtime}) + left.zipWithPar(right, (a, b) => [a, b]) ) assert.deepEqual(actual, [10, 20]) }) @@ -522,9 +497,7 @@ describe('FIO', () => { const right = counter.inc().delay(1000) const runtime = testRuntime() - runtime.unsafeExecuteSync( - left.zipWithPar(right, (a, b) => [a, b]).provide({runtime}) - ) + runtime.unsafeExecuteSync(left.zipWithPar(right, (a, b) => [a, b])) assert.deepEqual(counter.count, 0) }) @@ -535,7 +508,7 @@ describe('FIO', () => { const runtime = testRuntime() const actual = runtime.unsafeExecuteSync( - L.zipWithPar(R, (l, r) => 0).provide({runtime}) + L.zipWithPar(R, (l, r) => 0) ) as Error const expected = new Error(ERROR_MESSAGE) @@ -551,7 +524,7 @@ describe('FIO', () => { const runtime = testRuntime() const actual = runtime.unsafeExecuteSync( - A.zipWithPar(B, (l, r) => 0).provide({runtime}) + A.zipWithPar(B, (l, r) => 0) ) as Error const expected = new Error(ERROR_MESSAGE) @@ -568,9 +541,7 @@ describe('FIO', () => { const b = snapshot.mark('B').delay(2000) const runtime = testRuntime() - runtime.unsafeExecuteSync( - a.raceWith(b, FIO.void, FIO.void).provide({runtime}) - ) + runtime.unsafeExecuteSync(a.raceWith(b, FIO.void, FIO.void)) assert.deepEqual(snapshot.timeline, ['A@1001', 'B@2001']) }) @@ -583,9 +554,7 @@ describe('FIO', () => { const runtime = testRuntime() runtime.unsafeExecuteSync( - F1.raceWith(F2, FIO.void, FIO.void) - .and(snapshot.mark('C')) - .provide({runtime}) + F1.raceWith(F2, FIO.void, FIO.void).and(snapshot.mark('C')) ) assert.deepEqual(snapshot.timeline, ['A@1001', 'B@2001', 'C@2001']) @@ -604,9 +573,7 @@ describe('FIO', () => { F2, (E, F) => F.abort.and(FIO.fromEither(E)), (E, F) => F.abort.and(FIO.fromEither(E)) - ) - .and(snapshot.mark('C')) - .provide({runtime}) + ).and(snapshot.mark('C')) ) assert.deepEqual(snapshot.timeline, ['A@1001', 'C@1001']) @@ -636,7 +603,7 @@ describe('FIO', () => { const cbB = spy(FIO.void) const runtime = testRuntime() - runtime.unsafeExecuteSync(a.raceWith(b, cbA, cbB).provide({runtime})) + runtime.unsafeExecuteSync(a.raceWith(b, cbA, cbB)) cbA.should.be.called() cbB.should.be.called() @@ -648,7 +615,7 @@ describe('FIO', () => { const runtime = testRuntime() const actual = runtime.unsafeExecuteSync( - a.raceWith(b, () => FIO.of(10), () => FIO.of(20)).provide({runtime}) + a.raceWith(b, () => FIO.of(10), () => FIO.of(20)) ) assert.strictEqual(actual, 10) @@ -663,7 +630,7 @@ describe('FIO', () => { const b = snapshot.mark('B').delay(2000) const runtime = testRuntime() - runtime.unsafeExecuteSync(a.race(b).provide({runtime})) + runtime.unsafeExecuteSync(a.race(b)) assert.deepEqual(snapshot.timeline, ['A@1001']) }) @@ -673,7 +640,7 @@ describe('FIO', () => { const b = FIO.of('B').delay(1000) const runtime = testRuntime() - const actual = runtime.unsafeExecuteSync(a.race(b).provide({runtime})) + const actual = runtime.unsafeExecuteSync(a.race(b)) assert.strictEqual(actual, 'A') }) @@ -738,7 +705,7 @@ describe('FIO', () => { const actual = runtime.unsafeExecuteSync( FIO.access((_: {n: number}) => _.n + 1) .delay(1000) - .provide({n: 10, runtime}) + .provide({n: 10}) ) const expected = 11 @@ -755,7 +722,7 @@ describe('FIO', () => { ]) const runtime = testRuntime() - const actual = runtime.unsafeExecuteSync(io.provide({runtime})) + const actual = runtime.unsafeExecuteSync(io) const expected = [10, 20, 30] assert.deepStrictEqual(actual, expected) @@ -781,7 +748,7 @@ describe('FIO', () => { ]) const runtime = testRuntime() - const actual = runtime.unsafeExecuteSync(io.provide({runtime})) + const actual = runtime.unsafeExecuteSync(io) const expected = [10, 20, 30] assert.deepStrictEqual(actual, expected) @@ -803,7 +770,7 @@ describe('FIO', () => { const io = FIO.parN(1, [ID.delay(10), ID.delay(20), ID.delay(30)]) const runtime = testRuntime() - const actual = runtime.unsafeExecuteSync(io.provide({runtime})) + const actual = runtime.unsafeExecuteSync(io) const expected = [11, 31, 61] assert.deepStrictEqual(actual, expected) @@ -815,7 +782,7 @@ describe('FIO', () => { const io = FIO.parN(Infinity, [ID.delay(10), ID.delay(20), ID.delay(30)]) const runtime = testRuntime() - const actual = runtime.unsafeExecuteSync(io.provide({runtime})) + const actual = runtime.unsafeExecuteSync(io) const expected = [11, 21, 31] assert.deepStrictEqual(actual, expected) @@ -834,7 +801,7 @@ describe('FIO', () => { it('should capture exceptions from Node API', () => { const runtime = testRuntime() const actual = runtime.unsafeExecuteSync( - FIO.node(cb => cb(new Error('Failed'))).provide({runtime}) + FIO.node(cb => cb(new Error('Failed'))) ) const expected = new Error('Failed') @@ -846,7 +813,7 @@ describe('FIO', () => { const actual = runtime.unsafeExecuteSync( FIO.node(cb => { throw new Error('Failed') - }).provide({runtime}) + }) ) const expected = new Error('Failed') @@ -857,7 +824,7 @@ describe('FIO', () => { const runtime = testRuntime() const actual = runtime.unsafeExecuteSync( // tslint:disable-next-line: no-null-keyword - FIO.node(cb => cb(null, 1000)).provide({runtime}) + FIO.node(cb => cb(null, 1000)) ) const expected = 1000 @@ -869,9 +836,9 @@ describe('FIO', () => { it('should give access to current runtime', () => { const runtime = testRuntime() - const actual = runtime.unsafeExecuteSync(FIO.runtime().provide({runtime})) + const actual = runtime.unsafeExecuteSync(FIO.runtime()) - assert.strictEqual(actual, runtime as IRuntime) + assert.strictEqual(actual, runtime) }) }) diff --git a/test/FStream.test.ts b/test/FStream.test.ts index 39125b7f..0caaceca 100644 --- a/test/FStream.test.ts +++ b/test/FStream.test.ts @@ -62,7 +62,6 @@ describe('FStream', () => { FStream.of('A') .merge(FStream.of('B')) .forEach(_ => actual.mark(_)) - .provide({runtime}) ) const expected = ['A@1', 'B@1'] diff --git a/test/FiberContext.test.ts b/test/FiberContext.test.ts index 7b6f97fd..153fa66c 100644 --- a/test/FiberContext.test.ts +++ b/test/FiberContext.test.ts @@ -1,6 +1,5 @@ import {assert, spy} from 'chai' import {Either, Option} from 'standard-data-structures' -import {testScheduler} from 'ts-scheduler/test' import {FiberContext} from '../src/internals/Fiber' import {FIO} from '../src/main/FIO' @@ -15,9 +14,7 @@ describe('FiberContext', () => { context('scheduler idle', () => { it('should not execute', () => { const counter = new Counter() - const scheduler = testScheduler() - - FiberContext.evaluateWith(counter.inc(), scheduler) + FiberContext.unsafeExecute(counter.inc(), testRuntime()) assert.strictEqual(counter.count, 0) }) @@ -26,10 +23,10 @@ describe('FiberContext', () => { context('scheduler triggers', () => { it('should execute', () => { const counter = new Counter() - const scheduler = testScheduler() + const runtime = testRuntime() - FiberContext.evaluateWith(counter.inc(), scheduler) - scheduler.run() + FiberContext.unsafeExecute(counter.inc(), runtime) + runtime.scheduler.run() assert.strictEqual(counter.count, 1) }) @@ -39,36 +36,36 @@ describe('FiberContext', () => { context('on cancellation', () => { it('should not execute', () => { const counter = new Counter() - const scheduler = testScheduler() + const runtime = testRuntime() - const context = FiberContext.evaluateWith(counter.inc(), scheduler) + const context = FiberContext.unsafeExecute(counter.inc(), runtime) context.cancel() - scheduler.run() + runtime.scheduler.run() assert.strictEqual(counter.count, 0) }) it('should callback with none', () => { - const scheduler = testScheduler() + const runtime = testRuntime() const cb = spy() - const context = FiberContext.evaluateWith(FIO.of(0), scheduler) + const context = FiberContext.unsafeExecute(FIO.of(0), runtime) context.unsafeObserve(cb) context.cancel() - scheduler.run() + runtime.scheduler.run() cb.should.be.called.with(Option.none()) }) context('observer is added', () => { it('should call back with none', () => { - const scheduler = testScheduler() + const runtime = testRuntime() const cb = spy() - const context = FiberContext.evaluateWith(FIO.of(0), scheduler) + const context = FiberContext.unsafeExecute(FIO.of(0), runtime) context.cancel() context.unsafeObserve(cb) - scheduler.run() + runtime.scheduler.run() cb.should.be.called.with(Option.none()) }) @@ -77,13 +74,13 @@ describe('FiberContext', () => { context('on observer cancellation', () => { it('should not call observers', () => { - const scheduler = testScheduler() + const runtime = testRuntime() const cb = spy() - FiberContext.evaluateWith(FIO.of(0), scheduler) + FiberContext.unsafeExecute(FIO.of(0), runtime) .unsafeObserve(cb) .cancel() - scheduler.run() + runtime.scheduler.run() cb.should.be.not.be.called() }) @@ -91,11 +88,11 @@ describe('FiberContext', () => { context('on error', () => { it('should call rej with cause', () => { - const scheduler = testScheduler() + const runtime = testRuntime() const cb = spy() - FiberContext.evaluateWith(FIO.reject(1), scheduler).unsafeObserve(cb) - scheduler.run() + FiberContext.unsafeExecute(FIO.reject(1), runtime).unsafeObserve(cb) + runtime.scheduler.run() cb.should.called.with(Option.some(Either.left(1))) }) @@ -103,11 +100,11 @@ describe('FiberContext', () => { context('on success', () => { it('should call res with value', () => { - const scheduler = testScheduler() + const runtime = testRuntime() const cb = spy() - FiberContext.evaluateWith(FIO.of(1), scheduler).unsafeObserve(cb) - scheduler.run() + FiberContext.unsafeExecute(FIO.of(1), runtime).unsafeObserve(cb) + runtime.scheduler.run() cb.should.called.with(Option.some(Either.left(1))) }) @@ -115,25 +112,25 @@ describe('FiberContext', () => { context('on completed', () => { it('should call res with computed result', () => { - const scheduler = testScheduler() + const runtime = testRuntime() const cb = spy() - const context = FiberContext.evaluateWith(FIO.of(1), scheduler) - scheduler.run() + const context = FiberContext.unsafeExecute(FIO.of(1), runtime) + runtime.scheduler.run() context.unsafeObserve(cb) - scheduler.run() + runtime.scheduler.run() cb.should.called.with(Option.some(Either.right(1))) }) it('should call rej with computed cause', () => { - const scheduler = testScheduler() + const runtime = testRuntime() const cb = spy() - const context = FiberContext.evaluateWith(FIO.reject(1), scheduler) - scheduler.run() + const context = FiberContext.unsafeExecute(FIO.reject(1), runtime) + runtime.scheduler.run() context.unsafeObserve(cb) - scheduler.run() + runtime.scheduler.run() cb.should.called.with(Option.some(Either.left(1))) }) @@ -143,16 +140,9 @@ describe('FiberContext', () => { it('should wait for completion', () => { const runtime = testRuntime() const snapshot = new Snapshot() - const scheduler = runtime.scheduler - - FiberContext.evaluateWith( - snapshot - .mark('A') - .delay(1000) - .provide({runtime}), - scheduler - ) - scheduler.run() + + FiberContext.unsafeExecute(snapshot.mark('A').delay(1000), runtime) + runtime.scheduler.run() assert.deepStrictEqual(snapshot.timeline, ['A@1001']) }) @@ -162,21 +152,15 @@ describe('FiberContext', () => { it('should be executed in parallel', () => { const runtime = testRuntime() const snapshot = new Snapshot() - const scheduler = runtime.scheduler - const A = snapshot - .mark('A') - .delay(1000) - .provide({runtime}) - const B = snapshot - .mark('B') - .delay(2000) - .provide({runtime}) + const A = snapshot.mark('A').delay(1000) + + const B = snapshot.mark('B').delay(2000) - FiberContext.evaluateWith(A, scheduler) - FiberContext.evaluateWith(B, scheduler) + FiberContext.unsafeExecute(A, runtime) + FiberContext.unsafeExecute(B, runtime) - scheduler.run() + runtime.scheduler.run() assert.deepStrictEqual(snapshot.timeline, ['A@1001', 'B@2001']) }) @@ -203,16 +187,15 @@ describe('FiberContext', () => { const list = new Array() const insert = FIO.encase((_: number) => void list.push(_)) - const scheduler = testScheduler() - FiberContext.evaluateWith( + const runtime = testRuntime({maxInstructionCount: 5}) + FiberContext.unsafeExecute( FStream.range(101, 103) .merge(FStream.range(901, 903)) .mapM(insert).drain, - scheduler, - 5 + runtime ) - scheduler.run() + runtime.scheduler.run() const expected = [901, 101, 102, 103, 902, 903] assert.deepStrictEqual(list, expected) @@ -220,7 +203,7 @@ describe('FiberContext', () => { it('should switch between multiple contexts', () => { const MAX_INSTRUCTION_COUNT = 5 - const scheduler = testScheduler() + const runtime = testRuntime({maxInstructionCount: MAX_INSTRUCTION_COUNT}) const actual = new Array() const insert = FIO.encase((_: number) => void actual.push(_)) const longIO = FIO.of(1) @@ -231,10 +214,10 @@ describe('FiberContext', () => { .chain(insert) const shortIO = FIO.of(1000).chain(insert) - FiberContext.evaluateWith(longIO, scheduler, MAX_INSTRUCTION_COUNT) - FiberContext.evaluateWith(shortIO, scheduler, 5) + FiberContext.unsafeExecute(longIO, runtime) + FiberContext.unsafeExecute(shortIO, runtime) - scheduler.run() + runtime.scheduler.run() const expected = [1000, 5] assert.deepStrictEqual(actual, expected) diff --git a/test/Managed.test.ts b/test/Managed.test.ts index 56cd6935..e9a970c3 100644 --- a/test/Managed.test.ts +++ b/test/Managed.test.ts @@ -63,7 +63,6 @@ describe('Managed', () => { Managed.make(r.acquire, r.release) .use(() => FIO.timeout(0, 1000)) .fork.chain(F => F.abort.delay(500)) - .provide({runtime}) ) assert.ok(r.isReleased) @@ -77,7 +76,6 @@ describe('Managed', () => { Managed.make(r.acquire, r.release) .use(() => FIO.timeout(0, 1000)) .fork.chain(F => F.join.and(F.abort)) - .provide({runtime}) ) assert.strictEqual(r.count, 0) diff --git a/test/Queue.test.ts b/test/Queue.test.ts index 5fa29534..8d28aee3 100644 --- a/test/Queue.test.ts +++ b/test/Queue.test.ts @@ -56,9 +56,9 @@ describe('Queue', () => { it('should wait if the queue is empty', () => { const runtime = testRuntime() const actual = runtime.unsafeExecuteSync( - Queue.unbounded() - .chain(Q => Q.take.par(Q.offer('A').delay(1000)).map(_ => _[0])) - .provide({runtime}) + Queue.unbounded().chain(Q => + Q.take.par(Q.offer('A').delay(1000)).map(_ => _[0]) + ) ) const expected = 'A' @@ -68,9 +68,9 @@ describe('Queue', () => { it('should empty the queue once resolved', () => { const runtime = testRuntime() const actual = runtime.unsafeExecuteSync( - Queue.unbounded() - .chain(Q => Q.take.par(Q.offer('A').delay(1000)).and(Q.length)) - .provide({runtime}) + Queue.unbounded().chain(Q => + Q.take.par(Q.offer('A').delay(1000)).and(Q.length) + ) ) assert.strictEqual(actual, 0) diff --git a/test/internals/Snapshot.ts b/test/internals/Snapshot.ts index 575c1ac1..4b661a2d 100644 --- a/test/internals/Snapshot.ts +++ b/test/internals/Snapshot.ts @@ -2,11 +2,10 @@ * Created by tushar on 07/09/19 */ import {FIO, UIO} from '../../src/main/FIO' -import {IRuntimeEnv} from '../../src/runtimes/IRuntime' export class Snapshot { public readonly timeline = new Array() - public mark(value: T): FIO { + public mark(value: T): UIO { return FIO.runtime().chain(RTM => UIO( () => void this.timeline.push(value + '@' + RTM.scheduler.now()) From 995ebd6766120cb687a1720b36edda223a24f5e9 Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Sat, 19 Oct 2019 09:41:36 +0530 Subject: [PATCH 08/14] refactor(fiber): rename static function `unsafeExecute` to `unsafeExecuteWith` --- src/internals/Fiber.ts | 12 +++++------- src/runtimes/BaseRuntime.ts | 2 +- test/FiberContext.test.ts | 32 ++++++++++++++++---------------- 3 files changed, 22 insertions(+), 24 deletions(-) diff --git a/src/internals/Fiber.ts b/src/internals/Fiber.ts index e403d3f6..4f027406 100644 --- a/src/internals/Fiber.ts +++ b/src/internals/Fiber.ts @@ -29,22 +29,20 @@ enum FiberStatus { /** * Fibers are data structures that provide you a handle to control the execution of its `IO`. - * They can be created by calling the [[FIO.fork]] method. - * Fiber created is always going to be in a `Paused` state. To resume the fiber, you should call the `resume` or the `resumeAsync` methods. * @typeparam E Exceptions that can be thrown * @typeparam A The success value */ export abstract class Fiber { /** - * Executes the provided [[FIO]] expression. + * Uses a shared runtime to evaluate a [[FIO]] expression. * Returns a [[ICancellable]] that can be used to interrupt the execution. */ - public static unsafeExecute( + public static unsafeExecuteWith( io: IO, runtime: IRuntime, cb?: CBOption ): ICancellable { - return FiberContext.unsafeExecute(io, runtime, cb) + return FiberContext.unsafeExecuteWith(io, runtime, cb) } public abstract abort: UIO public abstract await: UIO>> @@ -75,7 +73,7 @@ export class FiberContext extends Fiber implements ICancellable { /** * Evaluates an IO using the provided scheduler */ - public static unsafeExecute( + public static unsafeExecuteWith( io: IO, runtime: IRuntime, cb?: CBOption @@ -293,7 +291,7 @@ export class FiberContext extends Fiber implements ICancellable { private unsafeRelease(p: UIO): void { this.cancellationList.push({ - cancel: () => Fiber.unsafeExecute(p, this.runtime) + cancel: () => Fiber.unsafeExecuteWith(p, this.runtime) }) } } diff --git a/src/runtimes/BaseRuntime.ts b/src/runtimes/BaseRuntime.ts index ffc7c589..beda71cb 100644 --- a/src/runtimes/BaseRuntime.ts +++ b/src/runtimes/BaseRuntime.ts @@ -16,6 +16,6 @@ export abstract class BaseRuntime implements IRuntime { ) {} public unsafeExecute(io: FIO, cb?: CBOption): ICancellable { - return Fiber.unsafeExecute(io, this, cb) + return Fiber.unsafeExecuteWith(io, this, cb) } } diff --git a/test/FiberContext.test.ts b/test/FiberContext.test.ts index 153fa66c..51f10a74 100644 --- a/test/FiberContext.test.ts +++ b/test/FiberContext.test.ts @@ -14,7 +14,7 @@ describe('FiberContext', () => { context('scheduler idle', () => { it('should not execute', () => { const counter = new Counter() - FiberContext.unsafeExecute(counter.inc(), testRuntime()) + FiberContext.unsafeExecuteWith(counter.inc(), testRuntime()) assert.strictEqual(counter.count, 0) }) @@ -25,7 +25,7 @@ describe('FiberContext', () => { const counter = new Counter() const runtime = testRuntime() - FiberContext.unsafeExecute(counter.inc(), runtime) + FiberContext.unsafeExecuteWith(counter.inc(), runtime) runtime.scheduler.run() assert.strictEqual(counter.count, 1) @@ -38,7 +38,7 @@ describe('FiberContext', () => { const counter = new Counter() const runtime = testRuntime() - const context = FiberContext.unsafeExecute(counter.inc(), runtime) + const context = FiberContext.unsafeExecuteWith(counter.inc(), runtime) context.cancel() runtime.scheduler.run() @@ -49,7 +49,7 @@ describe('FiberContext', () => { const runtime = testRuntime() const cb = spy() - const context = FiberContext.unsafeExecute(FIO.of(0), runtime) + const context = FiberContext.unsafeExecuteWith(FIO.of(0), runtime) context.unsafeObserve(cb) context.cancel() runtime.scheduler.run() @@ -62,7 +62,7 @@ describe('FiberContext', () => { const runtime = testRuntime() const cb = spy() - const context = FiberContext.unsafeExecute(FIO.of(0), runtime) + const context = FiberContext.unsafeExecuteWith(FIO.of(0), runtime) context.cancel() context.unsafeObserve(cb) runtime.scheduler.run() @@ -77,7 +77,7 @@ describe('FiberContext', () => { const runtime = testRuntime() const cb = spy() - FiberContext.unsafeExecute(FIO.of(0), runtime) + FiberContext.unsafeExecuteWith(FIO.of(0), runtime) .unsafeObserve(cb) .cancel() runtime.scheduler.run() @@ -91,7 +91,7 @@ describe('FiberContext', () => { const runtime = testRuntime() const cb = spy() - FiberContext.unsafeExecute(FIO.reject(1), runtime).unsafeObserve(cb) + FiberContext.unsafeExecuteWith(FIO.reject(1), runtime).unsafeObserve(cb) runtime.scheduler.run() cb.should.called.with(Option.some(Either.left(1))) @@ -103,7 +103,7 @@ describe('FiberContext', () => { const runtime = testRuntime() const cb = spy() - FiberContext.unsafeExecute(FIO.of(1), runtime).unsafeObserve(cb) + FiberContext.unsafeExecuteWith(FIO.of(1), runtime).unsafeObserve(cb) runtime.scheduler.run() cb.should.called.with(Option.some(Either.left(1))) @@ -115,7 +115,7 @@ describe('FiberContext', () => { const runtime = testRuntime() const cb = spy() - const context = FiberContext.unsafeExecute(FIO.of(1), runtime) + const context = FiberContext.unsafeExecuteWith(FIO.of(1), runtime) runtime.scheduler.run() context.unsafeObserve(cb) runtime.scheduler.run() @@ -127,7 +127,7 @@ describe('FiberContext', () => { const runtime = testRuntime() const cb = spy() - const context = FiberContext.unsafeExecute(FIO.reject(1), runtime) + const context = FiberContext.unsafeExecuteWith(FIO.reject(1), runtime) runtime.scheduler.run() context.unsafeObserve(cb) runtime.scheduler.run() @@ -141,7 +141,7 @@ describe('FiberContext', () => { const runtime = testRuntime() const snapshot = new Snapshot() - FiberContext.unsafeExecute(snapshot.mark('A').delay(1000), runtime) + FiberContext.unsafeExecuteWith(snapshot.mark('A').delay(1000), runtime) runtime.scheduler.run() assert.deepStrictEqual(snapshot.timeline, ['A@1001']) @@ -157,8 +157,8 @@ describe('FiberContext', () => { const B = snapshot.mark('B').delay(2000) - FiberContext.unsafeExecute(A, runtime) - FiberContext.unsafeExecute(B, runtime) + FiberContext.unsafeExecuteWith(A, runtime) + FiberContext.unsafeExecuteWith(B, runtime) runtime.scheduler.run() @@ -188,7 +188,7 @@ describe('FiberContext', () => { const insert = FIO.encase((_: number) => void list.push(_)) const runtime = testRuntime({maxInstructionCount: 5}) - FiberContext.unsafeExecute( + FiberContext.unsafeExecuteWith( FStream.range(101, 103) .merge(FStream.range(901, 903)) .mapM(insert).drain, @@ -214,8 +214,8 @@ describe('FiberContext', () => { .chain(insert) const shortIO = FIO.of(1000).chain(insert) - FiberContext.unsafeExecute(longIO, runtime) - FiberContext.unsafeExecute(shortIO, runtime) + FiberContext.unsafeExecuteWith(longIO, runtime) + FiberContext.unsafeExecuteWith(shortIO, runtime) runtime.scheduler.run() From a468441977617b354d615e8719446b7fe14dcdd7 Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Sat, 19 Oct 2019 09:43:01 +0530 Subject: [PATCH 09/14] refactor(fio): update env type --- src/main/FIO.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/FIO.ts b/src/main/FIO.ts index bcbaa1f4..0f056e32 100644 --- a/src/main/FIO.ts +++ b/src/main/FIO.ts @@ -121,7 +121,7 @@ export class FIO { */ public static accessP( cb: (R: R1) => Promise - ): FIO { + ): FIO { return FIO.env().chain(FIO.encaseP(cb)) } From 3830c814e71fbb04e14ae2be9d15f8536251cffb Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Sat, 19 Oct 2019 17:52:39 +0530 Subject: [PATCH 10/14] feat(fio): add forkWith method --- src/main/FIO.ts | 14 +++++++++++++- src/runtimes/BaseRuntime.ts | 2 +- src/runtimes/DefaultRuntime.ts | 5 +++++ src/runtimes/IRuntime.ts | 1 + src/runtimes/TestRuntime.ts | 7 ++++--- 5 files changed, 24 insertions(+), 5 deletions(-) diff --git a/src/main/FIO.ts b/src/main/FIO.ts index 0f056e32..87a458d4 100644 --- a/src/main/FIO.ts +++ b/src/main/FIO.ts @@ -76,7 +76,7 @@ export class FIO { } /** - * Returns a [[Fiber]]. The returned fiber is always in a paused state. + * Returns a [[Fiber]] */ public get fork(): FIO, R1> { return FIO.env().zipWithM(FIO.runtime(), (ENV, RTM) => @@ -587,6 +587,18 @@ export class FIO { return this.chain(FIO.encase(fn)) } + /** + * Returns a [[Fiber]] with a different `maxInstructionCount`. + */ + public forkWith(maxInstructionCount: number): FIO, R1> { + return FIO.env().zipWithM(FIO.runtime(), (ENV, RTM) => + FIO.fork( + this.provide(ENV), + RTM.setMaxInstructionCount(maxInstructionCount) + ) + ) + } + /** * Applies transformation on the success value of the FIO. */ diff --git a/src/runtimes/BaseRuntime.ts b/src/runtimes/BaseRuntime.ts index beda71cb..9528123a 100644 --- a/src/runtimes/BaseRuntime.ts +++ b/src/runtimes/BaseRuntime.ts @@ -14,7 +14,7 @@ export abstract class BaseRuntime implements IRuntime { public constructor( public readonly maxInstructionCount: number = Number.MAX_SAFE_INTEGER ) {} - + public abstract setMaxInstructionCount(maxInstructionCount: number): IRuntime public unsafeExecute(io: FIO, cb?: CBOption): ICancellable { return Fiber.unsafeExecuteWith(io, this, cb) } diff --git a/src/runtimes/DefaultRuntime.ts b/src/runtimes/DefaultRuntime.ts index 9fcd3f5e..e6aeeb6c 100644 --- a/src/runtimes/DefaultRuntime.ts +++ b/src/runtimes/DefaultRuntime.ts @@ -13,6 +13,11 @@ export class DefaultRuntime extends BaseRuntime { super(maxInstructionCount) } + // tslint:disable-next-line: prefer-function-over-method + public setMaxInstructionCount(maxInstructionCount: number): DefaultRuntime { + return new DefaultRuntime(maxInstructionCount) + } + public async unsafeExecutePromise(io: FIO): Promise { return new Promise((res, rej) => { this.unsafeExecute(io, O => O.map(_ => _.reduce(rej, res))) diff --git a/src/runtimes/IRuntime.ts b/src/runtimes/IRuntime.ts index 7aa3a864..e28d3777 100644 --- a/src/runtimes/IRuntime.ts +++ b/src/runtimes/IRuntime.ts @@ -16,6 +16,7 @@ import {FIO} from '../main/FIO' export interface IRuntime { maxInstructionCount: number scheduler: IScheduler + setMaxInstructionCount(maxInstructionCount: number): IRuntime /** * Executes the provided [[FIO]] expression. diff --git a/src/runtimes/TestRuntime.ts b/src/runtimes/TestRuntime.ts index 155ef1fa..d7897edc 100644 --- a/src/runtimes/TestRuntime.ts +++ b/src/runtimes/TestRuntime.ts @@ -16,17 +16,18 @@ type TestRuntimeOptions = Partial< > export class TestRuntime extends BaseRuntime { public readonly scheduler: TestScheduler - public constructor(options: TestRuntimeOptions) { + public constructor(private readonly options: TestRuntimeOptions) { super(options.maxInstructionCount) this.scheduler = testScheduler(options) } - + public setMaxInstructionCount(maxInstructionCount: number): TestRuntime { + return new TestRuntime({...this.options, maxInstructionCount}) + } public unsafeExecuteSync(io: IO): A | E | undefined { return this.unsafeExecuteSync0(io) .map(_ => _.reduce(Id, Id)) .getOrElse(undefined) } - public unsafeExecuteSync0(io: FIO): Option> { let result: Option> = Option.none() this.unsafeExecute(io, _ => (result = _)) From 31bc15bf2e5b5e6993cc4f900e82c9d8cf09ce6b Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Sat, 19 Oct 2019 18:03:58 +0530 Subject: [PATCH 11/14] test(fstream): add test for cooperative merging --- test/FStream.test.ts | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/test/FStream.test.ts b/test/FStream.test.ts index 0caaceca..9c988817 100644 --- a/test/FStream.test.ts +++ b/test/FStream.test.ts @@ -67,6 +67,28 @@ describe('FStream', () => { const expected = ['A@1', 'B@1'] assert.deepStrictEqual(actual.timeline, expected) }) + context('lower maxInstructionCount', () => { + it('should interleave values from two ranges', () => { + const actual = new Array() + const insert = FIO.encase((_: number) => void actual.push(_)) + const MAX_INSTRUCTION_COUNT = 5 + const runtime = testRuntime({ + maxInstructionCount: MAX_INSTRUCTION_COUNT + }) + + runtime.unsafeExecuteSync( + FStream.range(101, 103) + .merge(FStream.range(901, 903)) + .mapM(insert).drain + ) + + runtime.scheduler.run() + + const expected = [101, 102, 103, 901, 902, 903] + assert.sameDeepMembers(actual, expected) + assert.notDeepEqual(actual, expected) + }) + }) }) describe('take', () => { From e43be51ae02baf235147e454d7e2613320a7fbb9 Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Sat, 19 Oct 2019 18:19:24 +0530 Subject: [PATCH 12/14] refactor(fiber): fix op count issue --- src/internals/Fiber.ts | 2 +- test/FiberContext.test.ts | 46 ++++++++++++++++++++++++--------------- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/src/internals/Fiber.ts b/src/internals/Fiber.ts index 4f027406..8c80f120 100644 --- a/src/internals/Fiber.ts +++ b/src/internals/Fiber.ts @@ -163,7 +163,6 @@ export class FiberContext extends Fiber implements ICancellable { let data: unknown = ddd let count = 0 while (true) { - count++ if (count === this.runtime.maxInstructionCount) { return this.init(data) } @@ -286,6 +285,7 @@ export class FiberContext extends Fiber implements ICancellable { } catch (e) { this.stackA.push(FIO.reject(e).asInstruction) } + count++ } } diff --git a/test/FiberContext.test.ts b/test/FiberContext.test.ts index 51f10a74..78f1ac8f 100644 --- a/test/FiberContext.test.ts +++ b/test/FiberContext.test.ts @@ -183,24 +183,6 @@ describe('FiberContext', () => { }) context('instruction count is reduced', () => { - it.skip('should cooperatively merge two streams', () => { - const list = new Array() - const insert = FIO.encase((_: number) => void list.push(_)) - - const runtime = testRuntime({maxInstructionCount: 5}) - FiberContext.unsafeExecuteWith( - FStream.range(101, 103) - .merge(FStream.range(901, 903)) - .mapM(insert).drain, - runtime - ) - - runtime.scheduler.run() - - const expected = [901, 101, 102, 103, 902, 903] - assert.deepStrictEqual(list, expected) - }) - it('should switch between multiple contexts', () => { const MAX_INSTRUCTION_COUNT = 5 const runtime = testRuntime({maxInstructionCount: MAX_INSTRUCTION_COUNT}) @@ -223,4 +205,32 @@ describe('FiberContext', () => { assert.deepStrictEqual(actual, expected) }) }) + + context('instruction count is zero', () => { + it('should not fail', () => { + const snapshot = new Snapshot() + const runtime = testRuntime({maxInstructionCount: 0}) + FiberContext.unsafeExecuteWith( + FIO.of('A').chain(_ => snapshot.mark(_)), + runtime + ) + runtime.scheduler.run() + + assert.deepStrictEqual(snapshot.timeline, ['A@1']) + }) + }) + + context('instruction count is negative', () => { + it('should not fail', () => { + const snapshot = new Snapshot() + const runtime = testRuntime({maxInstructionCount: -100}) + FiberContext.unsafeExecuteWith( + FIO.of('A').chain(_ => snapshot.mark(_)), + runtime + ) + runtime.scheduler.run() + + assert.deepStrictEqual(snapshot.timeline, ['A@1']) + }) + }) }) From 7428a1d84556d55f1fd14068f9dcfb049e988edc Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Sat, 19 Oct 2019 18:19:49 +0530 Subject: [PATCH 13/14] fix(runtime): handle invalid values inside runtime --- src/runtimes/BaseRuntime.ts | 10 +++++++--- test/Runtime.test.ts | 24 ++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) create mode 100644 test/Runtime.test.ts diff --git a/src/runtimes/BaseRuntime.ts b/src/runtimes/BaseRuntime.ts index 9528123a..65ce82da 100644 --- a/src/runtimes/BaseRuntime.ts +++ b/src/runtimes/BaseRuntime.ts @@ -10,10 +10,14 @@ import {FIO} from '../main/FIO' import {IRuntime} from './IRuntime' export abstract class BaseRuntime implements IRuntime { + public readonly maxInstructionCount: number public abstract readonly scheduler: IScheduler - public constructor( - public readonly maxInstructionCount: number = Number.MAX_SAFE_INTEGER - ) {} + public constructor(maxInstructionCount: number = Number.MAX_SAFE_INTEGER) { + this.maxInstructionCount = Math.min( + Math.max(1, maxInstructionCount), + Number.MAX_SAFE_INTEGER + ) + } public abstract setMaxInstructionCount(maxInstructionCount: number): IRuntime public unsafeExecute(io: FIO, cb?: CBOption): ICancellable { return Fiber.unsafeExecuteWith(io, this, cb) diff --git a/test/Runtime.test.ts b/test/Runtime.test.ts new file mode 100644 index 00000000..d5cc93a6 --- /dev/null +++ b/test/Runtime.test.ts @@ -0,0 +1,24 @@ +import {assert} from 'chai' + +import {testRuntime} from '../src/runtimes/TestRuntime' + +describe('runtime', () => { + context('instruction count is Infinite', () => { + it('should set it to MAX_SAFE_INTEGER', () => { + const runtime = testRuntime({maxInstructionCount: Infinity}) + assert.strictEqual(runtime.maxInstructionCount, Number.MAX_SAFE_INTEGER) + }) + }) + context('instruction count is negative', () => { + it('should set it to 1', () => { + const runtime = testRuntime({maxInstructionCount: -100}) + assert.strictEqual(runtime.maxInstructionCount, 1) + }) + }) + context('instruction count is zero', () => { + it('should set it to 1', () => { + const runtime = testRuntime({maxInstructionCount: 0}) + assert.strictEqual(runtime.maxInstructionCount, 1) + }) + }) +}) From 860605ce32ab520a2e837d60de1d4631d9cb9b19 Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Sat, 19 Oct 2019 18:41:48 +0530 Subject: [PATCH 14/14] style(lint): apply lint fixes --- src/main/FIO.ts | 2 +- src/main/FStream.ts | 19 ++++++++----------- test/FiberContext.test.ts | 1 - 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/main/FIO.ts b/src/main/FIO.ts index 87a458d4..9dfe29d7 100644 --- a/src/main/FIO.ts +++ b/src/main/FIO.ts @@ -476,7 +476,7 @@ export class FIO { /** * Tries to run an function that returns a promise. */ - public static tryP(cb: () => Promise): TaskR { + public static tryP(cb: () => Promise): Task { return FIO.encaseP(cb)() } diff --git a/src/main/FStream.ts b/src/main/FStream.ts index c616b38a..cb66da7a 100644 --- a/src/main/FStream.ts +++ b/src/main/FStream.ts @@ -41,7 +41,7 @@ export class FStream { /** * Creates a stream that constantly emits the provided value. */ - public static const(a: A1): FStream { + public static const(a: A1): Stream { return new FStream((s, cont, next) => FIO.if0()(() => cont(s), () => next(s, a), () => FIO.of(s)) ) @@ -50,12 +50,12 @@ export class FStream { /** * Create a stream from an array */ - public static fromArray(t: A[]): FStream { + public static fromArray(t: A1[]): Stream { return new FStream( ( state: S, cont: (s: S) => boolean, - next: (s: S, a: A) => FIO + next: (s: S, a: A1) => FIO ) => { const itar = (s: S, i: number): FIO => FIO.if0()( @@ -87,7 +87,7 @@ export class FStream { public static fromEventEmitter( ev: EventEmitter, name: string - ): FIO>> { + ): UIO>> { return FIO.runtime().zipWith(Queue.bounded(1), (RTM, Q) => { const onEvent = (a: A) => RTM.unsafeExecute(Q.offer(a)) @@ -101,7 +101,7 @@ export class FStream { /** * Creates a stream from a [[Queue]] */ - public static fromQueue(Q: Queue): FStream { + public static fromQueue(Q: Queue): Stream { return new FStream( ( state: S, @@ -123,17 +123,14 @@ export class FStream { /** * Creates a stream that emits after every given duration of time. */ - public static interval( - A1: A1, - duration: number - ): FStream { + public static interval(A1: A1, duration: number): Stream { return FStream.produce(FIO.timeout(A1, duration)) } /** * Creates a stream from the provided values */ - public static of(...t: A1[]): FStream { + public static of(...t: A1[]): Stream { return FStream.fromArray(t) } @@ -162,7 +159,7 @@ export class FStream { /** * Creates a stream that emits the given ranges of values */ - public static range(min: number, max: number): FStream { + public static range(min: number, max: number): Stream { return new FStream( ( state: S, diff --git a/test/FiberContext.test.ts b/test/FiberContext.test.ts index 78f1ac8f..e43360cd 100644 --- a/test/FiberContext.test.ts +++ b/test/FiberContext.test.ts @@ -3,7 +3,6 @@ import {Either, Option} from 'standard-data-structures' import {FiberContext} from '../src/internals/Fiber' import {FIO} from '../src/main/FIO' -import {FStream} from '../src/main/FStream' import {testRuntime} from '../src/runtimes/TestRuntime' import {Counter} from './internals/Counter'