From 4be694d66be48fc0cfdd15b830c6811fc7e0176f Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Fri, 18 Oct 2019 15:05:54 +0530 Subject: [PATCH] feat(fiber): fiber now can switch between multiple contexts based on instruction count --- src/internals/FiberContext.ts | 9 ++-- src/main/FIO.ts | 7 ++- src/main/Instructions.ts | 1 + src/main/Queue.ts | 86 ++++++++++++++++++----------------- test/FiberContext.test.ts | 44 ++++++++++++++++++ 5 files changed, 100 insertions(+), 47 deletions(-) diff --git a/src/internals/FiberContext.ts b/src/internals/FiberContext.ts index b1ddd182..086caf67 100644 --- a/src/internals/FiberContext.ts +++ b/src/internals/FiberContext.ts @@ -51,9 +51,10 @@ export class FiberContext implements ICancellable, IFiber { */ public static evaluateWith( io: IO, - scheduler: IScheduler + scheduler: IScheduler, + maxInstructionCount: number = Number.MAX_SAFE_INTEGER ): FiberContext { - return new FiberContext(io.asInstruction, scheduler) + return new FiberContext(io.asInstruction, scheduler, maxInstructionCount) } private static dispatchResult( @@ -74,7 +75,7 @@ export class FiberContext implements ICancellable, IFiber { private constructor( instruction: Instruction, private readonly scheduler: IScheduler, - private readonly maxInstructionCount: number = Number.MAX_SAFE_INTEGER + private readonly maxInstructionCount: number ) { this.stackA.push(instruction) this.init() @@ -196,7 +197,7 @@ export class FiberContext implements ICancellable, IFiber { // A new context is created so that computation from that instruction can happen separately. // and then join back into the current context. // Using the same stack will corrupt it completely. - const nContext = new FiberContext(j.i0, this.scheduler) + const nContext = new FiberContext(j.i0, this.scheduler, j.i1) this.cancellationList.push(nContext) data = nContext break diff --git a/src/main/FIO.ts b/src/main/FIO.ts index 46a1b7a0..1da1f652 100644 --- a/src/main/FIO.ts +++ b/src/main/FIO.ts @@ -280,8 +280,11 @@ export class FIO { /** * Creates a new [[Fiber]] to run the given [[IO]]. */ - public static fork(io: IO): UIO> { - return new FIO(Tag.Fork, io) + public static fork( + io: IO, + maxInstructionCount: number = Number.MAX_SAFE_INTEGER + ): UIO> { + return new FIO(Tag.Fork, io, maxInstructionCount) } /** diff --git a/src/main/Instructions.ts b/src/main/Instructions.ts index d4d286fb..0268093f 100644 --- a/src/main/Instructions.ts +++ b/src/main/Instructions.ts @@ -69,6 +69,7 @@ export interface INever { } export interface IFork { i0: Instruction + i1: number // TODO: Instead of number pass runtime. tag: Tag.Fork } export interface IAccess { diff --git a/src/main/Queue.ts b/src/main/Queue.ts index adcbdbad..27e997b4 100644 --- a/src/main/Queue.ts +++ b/src/main/Queue.ts @@ -1,6 +1,4 @@ -import {LinkedListNode, List} from 'standard-data-structures' - -import {PureMutableList} from '../internals/PureMutableList' +import {DoublyLinkedList, List, Option} from 'standard-data-structures' import {Await} from './Await' import {FIO, NoEnv, UIO} from './FIO' @@ -14,41 +12,44 @@ export class Queue { * Returns the Queue as an array */ public get asArray(): UIO { - return this.Q.asArray + return UIO(() => this.Q.asArray) } /** * Returns the number of elements in the queue */ public get length(): UIO { - return this.Q.length + return UIO(() => this.Q.length) } /** * Pulls an item from the queue */ public get take(): UIO { - return this.Q.shift.chain(sz => - sz - .map(FIO.of) - .getOrElse( - FIO.flatten( - Await.of().chain( - FIO.encase(await => this.T.add(await).and(await.get)) - ) - ) + return FIO.flattenM(() => { + const sz = this.Q.shift() + + if (Option.isSome(sz)) { + return FIO.of(sz.value) + } + + return FIO.flatten( + Await.of().chain( + FIO.encase(await => { + this.T.add(await) + + return await.get + }) ) - ) + ) + }) } /** * Creates a new bounded Queue */ public static bounded(capacity: number): UIO> { - return PureMutableList.of().zipWith( - PureMutableList.of>(), - (Q, T) => new Queue(capacity, Q, T) - ) + return UIO(() => new Queue(capacity)) } /** @@ -58,24 +59,40 @@ export class Queue { return Queue.bounded(Number.MAX_SAFE_INTEGER) } - private constructor( - public readonly capacity: number, - private readonly Q: PureMutableList, - private readonly T: PureMutableList> - ) {} + private readonly Q = DoublyLinkedList.of() + private readonly T = DoublyLinkedList.of>() + private constructor(public readonly capacity: number) {} /** * Inserts an item into the queue */ - public offer(a: A): UIO> { - return this.Q.add(a).tap(_ => this.setAwaited(_.value)) + public offer(a: A): UIO { + return FIO.flattenM( + (): UIO => { + if (this.T.length === 0) { + this.Q.add(a) + + return FIO.void() + } + + const io = new Array>() + while (this.T.length !== 0) { + const item = this.T.shift() + if (Option.isSome(item)) { + io.push(item.value.set(FIO.of(a))) + } + } + + return FIO.seq(io).void + } + ) } /** * Adds all the provided items into the queue */ - public offerAll(...a: A[]): UIO>> { - return FIO.seq(a.map(_ => this.offer(_))) + public offerAll(...a: A[]): UIO { + return FIO.seq(a.map(_ => this.offer(_))).void } /** @@ -92,19 +109,6 @@ export class Queue { return itar(0, List.empty()).map(_ => _.asArray) } - private setAwaited(value: A): UIO { - const itar = (list: List>): UIO>> => - this.T.shift.chain(_ => - _.map(AWT => itar(list.prepend(AWT.set(FIO.of(value))))).getOrElse( - FIO.of(list) - ) - ) - - return itar(List.empty>()) - .tap(_ => (!_.isEmpty ? this.Q.shift : FIO.void())) - .chain(_ => FIO.seq(_.asArray)) - } - /** * Converts a queue into a [[FStream]] */ diff --git a/test/FiberContext.test.ts b/test/FiberContext.test.ts index ff7a86b1..7bd16867 100644 --- a/test/FiberContext.test.ts +++ b/test/FiberContext.test.ts @@ -4,6 +4,7 @@ import {testScheduler} from 'ts-scheduler/test' import {FiberContext} from '../src/internals/FiberContext' import {FIO} from '../src/main/FIO' +import {FStream} from '../src/main/FStream' import {testRuntime} from '../src/runtimes/TestRuntime' import {Counter} from './internals/Counter' @@ -196,4 +197,47 @@ describe('FiberContext', () => { it.skip('should return none') }) }) + + context('instruction count is reduced', () => { + it.skip('should cooperatively merge two streams', () => { + const list = new Array() + const insert = FIO.encase((_: number) => void list.push(_)) + + const scheduler = testScheduler() + FiberContext.evaluateWith( + FStream.range(101, 103) + .merge(FStream.range(901, 903)) + .mapM(insert).drain, + scheduler, + 5 + ) + + scheduler.run() + + const expected = [901, 101, 102, 103, 902, 903] + assert.deepStrictEqual(list, expected) + }) + + it('should switch between multiple contexts', () => { + const MAX_INSTRUCTION_COUNT = 5 + const scheduler = testScheduler() + const actual = new Array() + const insert = FIO.encase((_: number) => void actual.push(_)) + const longIO = FIO.of(1) + .and(FIO.of(2)) + .and(FIO.of(3)) + .and(FIO.of(4)) + .and(FIO.of(5)) + .chain(insert) + const shortIO = FIO.of(1000).chain(insert) + + FiberContext.evaluateWith(longIO, scheduler, MAX_INSTRUCTION_COUNT) + FiberContext.evaluateWith(shortIO, scheduler, 5) + + scheduler.run() + + const expected = [1000, 5] + assert.deepStrictEqual(actual, expected) + }) + }) })