Skip to content

Commit

Permalink
feat(fiber): fiber now can switch between multiple contexts based on …
Browse files Browse the repository at this point in the history
…instruction count
  • Loading branch information
tusharmath committed Oct 18, 2019
1 parent 3ac6dd3 commit 4be694d
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 47 deletions.
9 changes: 5 additions & 4 deletions src/internals/FiberContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ export class FiberContext<E, A> implements ICancellable, IFiber<E, A> {
*/
public static evaluateWith<E, A>(
io: IO<E, A>,
scheduler: IScheduler
scheduler: IScheduler,
maxInstructionCount: number = Number.MAX_SAFE_INTEGER
): FiberContext<E, A> {
return new FiberContext(io.asInstruction, scheduler)
return new FiberContext(io.asInstruction, scheduler, maxInstructionCount)
}

private static dispatchResult<E, A>(
Expand All @@ -74,7 +75,7 @@ export class FiberContext<E, A> implements ICancellable, IFiber<E, A> {
private constructor(
instruction: Instruction,
private readonly scheduler: IScheduler,
private readonly maxInstructionCount: number = Number.MAX_SAFE_INTEGER
private readonly maxInstructionCount: number
) {
this.stackA.push(instruction)
this.init()
Expand Down Expand Up @@ -196,7 +197,7 @@ export class FiberContext<E, A> implements ICancellable, IFiber<E, A> {
// A new context is created so that computation from that instruction can happen separately.
// and then join back into the current context.
// Using the same stack will corrupt it completely.
const nContext = new FiberContext(j.i0, this.scheduler)
const nContext = new FiberContext(j.i0, this.scheduler, j.i1)
this.cancellationList.push(nContext)
data = nContext
break
Expand Down
7 changes: 5 additions & 2 deletions src/main/FIO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,11 @@ export class FIO<E1 = unknown, A1 = unknown, R1 = NoEnv> {
/**
* Creates a new [[Fiber]] to run the given [[IO]].
*/
public static fork<E1, A1>(io: IO<E1, A1>): UIO<IFiber<E1, A1>> {
return new FIO(Tag.Fork, io)
public static fork<E1, A1>(
io: IO<E1, A1>,
maxInstructionCount: number = Number.MAX_SAFE_INTEGER
): UIO<IFiber<E1, A1>> {
return new FIO(Tag.Fork, io, maxInstructionCount)
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/main/Instructions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export interface INever {
}
export interface IFork {
i0: Instruction
i1: number // TODO: Instead of number pass runtime.
tag: Tag.Fork
}
export interface IAccess<X = unknown, Y = unknown> {
Expand Down
86 changes: 45 additions & 41 deletions src/main/Queue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import {LinkedListNode, List} from 'standard-data-structures'

import {PureMutableList} from '../internals/PureMutableList'
import {DoublyLinkedList, List, Option} from 'standard-data-structures'

import {Await} from './Await'
import {FIO, NoEnv, UIO} from './FIO'
Expand All @@ -14,41 +12,44 @@ export class Queue<A = never> {
* Returns the Queue as an array
*/
public get asArray(): UIO<A[]> {
return this.Q.asArray
return UIO(() => this.Q.asArray)
}

/**
* Returns the number of elements in the queue
*/
public get length(): UIO<number> {
return this.Q.length
return UIO(() => this.Q.length)
}

/**
* Pulls an item from the queue
*/
public get take(): UIO<A> {
return this.Q.shift.chain(sz =>
sz
.map(FIO.of)
.getOrElse(
FIO.flatten(
Await.of<never, A>().chain(
FIO.encase(await => this.T.add(await).and(await.get))
)
)
return FIO.flattenM(() => {
const sz = this.Q.shift()

if (Option.isSome(sz)) {
return FIO.of(sz.value)
}

return FIO.flatten(
Await.of<never, A>().chain(
FIO.encase(await => {
this.T.add(await)

return await.get
})
)
)
)
})
}

/**
* Creates a new bounded Queue
*/
public static bounded<A>(capacity: number): UIO<Queue<A>> {
return PureMutableList.of<A>().zipWith(
PureMutableList.of<Await<never, A>>(),
(Q, T) => new Queue(capacity, Q, T)
)
return UIO(() => new Queue(capacity))
}

/**
Expand All @@ -58,24 +59,40 @@ export class Queue<A = never> {
return Queue.bounded(Number.MAX_SAFE_INTEGER)
}

private constructor(
public readonly capacity: number,
private readonly Q: PureMutableList<A>,
private readonly T: PureMutableList<Await<never, A>>
) {}
private readonly Q = DoublyLinkedList.of<A>()
private readonly T = DoublyLinkedList.of<Await<never, A>>()
private constructor(public readonly capacity: number) {}

/**
* Inserts an item into the queue
*/
public offer(a: A): UIO<LinkedListNode<A>> {
return this.Q.add(a).tap(_ => this.setAwaited(_.value))
public offer(a: A): UIO<void> {
return FIO.flattenM(
(): UIO<void> => {
if (this.T.length === 0) {
this.Q.add(a)

return FIO.void()
}

const io = new Array<UIO<boolean>>()
while (this.T.length !== 0) {
const item = this.T.shift()
if (Option.isSome(item)) {
io.push(item.value.set(FIO.of(a)))
}
}

return FIO.seq(io).void
}
)
}

/**
* Adds all the provided items into the queue
*/
public offerAll(...a: A[]): UIO<Array<LinkedListNode<A>>> {
return FIO.seq(a.map(_ => this.offer(_)))
public offerAll(...a: A[]): UIO<void> {
return FIO.seq(a.map(_ => this.offer(_))).void
}

/**
Expand All @@ -92,19 +109,6 @@ export class Queue<A = never> {
return itar(0, List.empty<A>()).map(_ => _.asArray)
}

private setAwaited(value: A): UIO<boolean[]> {
const itar = (list: List<UIO<boolean>>): UIO<List<UIO<boolean>>> =>
this.T.shift.chain(_ =>
_.map(AWT => itar(list.prepend(AWT.set(FIO.of(value))))).getOrElse(
FIO.of(list)
)
)

return itar(List.empty<UIO<boolean>>())
.tap(_ => (!_.isEmpty ? this.Q.shift : FIO.void()))
.chain(_ => FIO.seq(_.asArray))
}

/**
* Converts a queue into a [[FStream]]
*/
Expand Down
44 changes: 44 additions & 0 deletions test/FiberContext.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {testScheduler} from 'ts-scheduler/test'

import {FiberContext} from '../src/internals/FiberContext'
import {FIO} from '../src/main/FIO'
import {FStream} from '../src/main/FStream'
import {testRuntime} from '../src/runtimes/TestRuntime'

import {Counter} from './internals/Counter'
Expand Down Expand Up @@ -196,4 +197,47 @@ describe('FiberContext', () => {
it.skip('should return none')
})
})

context('instruction count is reduced', () => {
it.skip('should cooperatively merge two streams', () => {
const list = new Array<number>()
const insert = FIO.encase((_: number) => void list.push(_))

const scheduler = testScheduler()
FiberContext.evaluateWith(
FStream.range(101, 103)
.merge(FStream.range(901, 903))
.mapM(insert).drain,
scheduler,
5
)

scheduler.run()

const expected = [901, 101, 102, 103, 902, 903]
assert.deepStrictEqual(list, expected)
})

it('should switch between multiple contexts', () => {
const MAX_INSTRUCTION_COUNT = 5
const scheduler = testScheduler()
const actual = new Array<number>()
const insert = FIO.encase((_: number) => void actual.push(_))
const longIO = FIO.of(1)
.and(FIO.of(2))
.and(FIO.of(3))
.and(FIO.of(4))
.and(FIO.of(5))
.chain(insert)
const shortIO = FIO.of(1000).chain(insert)

FiberContext.evaluateWith(longIO, scheduler, MAX_INSTRUCTION_COUNT)
FiberContext.evaluateWith(shortIO, scheduler, 5)

scheduler.run()

const expected = [1000, 5]
assert.deepStrictEqual(actual, expected)
})
})
})

0 comments on commit 4be694d

Please sign in to comment.