Skip to content

Commit

Permalink
Merge pull request #11 from tusharmath/refactor/fiber-2
Browse files Browse the repository at this point in the history
Refactor: Fiber
  • Loading branch information
tusharmath authored Oct 19, 2019
2 parents 2e88a90 + 860605c commit ef9d03b
Show file tree
Hide file tree
Showing 22 changed files with 404 additions and 314 deletions.
3 changes: 1 addition & 2 deletions example/guess-the-number/src/Run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ const runtime = defaultRuntime()
runtime.unsafeExecute(
program.provide({
math: Math,
runtime,
tty: {
getStrLn: FIO.pipeEnv(getStrLn, {process, readline, runtime}),
getStrLn: FIO.pipeEnv(getStrLn, {process, readline}),
putStrLn: FIO.pipeEnv(putStrLn, {console})
}
})
Expand Down
20 changes: 8 additions & 12 deletions example/guess-the-number/test/Program.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ describe('Program', () => {
const math = MockMath()
const tty = MockTTY({})
const runtime = testRuntime()
runtime.unsafeExecuteSync(program.provide({math, tty, runtime}))
runtime.unsafeExecuteSync(program.provide({math, tty}))

assert.deepStrictEqual(tty.stdout, ['Greetings!', 'Enter your name: '])
})
Expand All @@ -57,7 +57,7 @@ describe('Program', () => {
'Enter your name: ': ['John']
})
const runtime = testRuntime()
runtime.unsafeExecuteSync(program.provide({math, tty, runtime}))
runtime.unsafeExecuteSync(program.provide({math, tty}))

assert.deepStrictEqual(tty.stdout, [
'Greetings!',
Expand All @@ -74,7 +74,7 @@ describe('Program', () => {
'Enter your name: ': ['John']
})
const runtime = testRuntime()
runtime.unsafeExecuteSync(program.provide({math, tty, runtime}))
runtime.unsafeExecuteSync(program.provide({math, tty}))

assert.deepStrictEqual(tty.stdout, [
'Greetings!',
Expand All @@ -94,7 +94,7 @@ describe('Program', () => {
'Enter your name: ': ['John']
})
const runtime = testRuntime()
runtime.unsafeExecuteSync(program.provide({math, tty, runtime}))
runtime.unsafeExecuteSync(program.provide({math, tty}))

assert.deepStrictEqual(tty.stdout, [
'Greetings!',
Expand All @@ -115,7 +115,7 @@ describe('Program', () => {
'Press ⏎ to continue (or will exit in 3sec): ': ['', '']
})
const runtime = testRuntime()
runtime.unsafeExecuteSync(program.provide({math, tty, runtime}))
runtime.unsafeExecuteSync(program.provide({math, tty}))

assert.deepStrictEqual(tty.stdout, [
'Greetings!',
Expand All @@ -138,9 +138,7 @@ describe('Program', () => {
'Press ⏎ to continue (or will exit in 3sec): ': ['']
})
const runtime = testRuntime()
const actual = runtime.unsafeExecuteSync(
canContinue.provide({tty, runtime})
)
const actual = runtime.unsafeExecuteSync(canContinue.provide({tty}))

assert.isTrue(actual)
})
Expand All @@ -149,16 +147,14 @@ describe('Program', () => {
it('should return false', () => {
const tty = MockTTY({})
const runtime = testRuntime()
const actual = runtime.unsafeExecuteSync(
canContinue.provide({tty, runtime})
)
const actual = runtime.unsafeExecuteSync(canContinue.provide({tty}))

assert.isFalse(actual)
})
it('should output goodbye', () => {
const tty = MockTTY({})
const runtime = testRuntime()
runtime.unsafeExecuteSync(canContinue.provide({tty, runtime}))
runtime.unsafeExecuteSync(canContinue.provide({tty}))

assert.deepStrictEqual(tty.stdout, [
'Press ⏎ to continue (or will exit in 3sec): ',
Expand Down
2 changes: 1 addition & 1 deletion index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ export {Await} from './src/main/Await'
export {defaultRuntime} from './src/runtimes/DefaultRuntime'
export {FIO, IO, Task, TaskR, UIO} from './src/main/FIO'
export {FStream, Stream} from './src/main/FStream'
export {IFiber} from './src/main/IFiber'
export {Fiber} from './src/internals/Fiber'
export {IRuntime} from './src/runtimes/IRuntime'
export {Managed} from './src/main/Managed'
export {Queue} from './src/main/Queue'
Expand Down
100 changes: 75 additions & 25 deletions src/internals/FiberContext.ts → src/internals/Fiber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import {
LinkedListNode,
Option
} from 'standard-data-structures'
import {ICancellable, IScheduler} from 'ts-scheduler'
import {ICancellable} from 'ts-scheduler'

import {FIO, IO, UIO} from '../main/FIO'
import {IFiber} from '../main/IFiber'
import {Instruction, Tag} from '../main/Instructions'
import {IRuntime} from '../runtimes/IRuntime'

import {CancellationList} from './CancellationList'
import {CBOption} from './CBOption'
Expand All @@ -27,13 +27,37 @@ enum FiberStatus {
CANCELLED
}

/**
* Fibers are data structures that provide you a handle to control the execution of its `IO`.
* @typeparam E Exceptions that can be thrown
* @typeparam A The success value
*/
export abstract class Fiber<E, A> {
/**
* Uses a shared runtime to evaluate a [[FIO]] expression.
* Returns a [[ICancellable]] that can be used to interrupt the execution.
*/
public static unsafeExecuteWith<E, A>(
io: IO<E, A>,
runtime: IRuntime,
cb?: CBOption<E, A>
): ICancellable {
return FiberContext.unsafeExecuteWith<E, A>(io, runtime, cb)
}
public abstract abort: UIO<void>
public abstract await: UIO<Option<Either<E, A>>>
public abstract join: FIO<E, A>
public abstract runtime: IRuntime
public abstract release(p: UIO<void>): UIO<void>
}

/**
* FiberContext actually evaluates the FIO expression.
* Its creation is effectful.
* As soon as its created it starts to evaluate the FIO expression.
* It provides public APIs to [[Fiber]] to consume.
*/
export class FiberContext<E, A> implements ICancellable, IFiber<E, A> {
export class FiberContext<E, A> extends Fiber<E, A> implements ICancellable {
public get abort(): UIO<void> {
return UIO(() => this.cancel())
}
Expand All @@ -49,41 +73,41 @@ export class FiberContext<E, A> implements ICancellable, IFiber<E, A> {
/**
* Evaluates an IO using the provided scheduler
*/
public static evaluateWith<E, A>(
public static unsafeExecuteWith<E, A>(
io: IO<E, A>,
scheduler: IScheduler
runtime: IRuntime,
cb?: CBOption<E, A>
): FiberContext<E, A> {
return new FiberContext(scheduler, io.asInstruction)
}
const context = new FiberContext<E, A>(io.asInstruction, runtime)
if (cb !== undefined) {
context.unsafeObserve(cb)
}

public static of<E, A>(
scheduler: IScheduler,
p: FIO<E, A, unknown>
): FiberContext<E, A> {
return new FiberContext(scheduler, p.asInstruction)
return context
}

private static dispatchResult<E, A>(
result: Option<Either<E, A>>,
cb: CBOption<E, A>
): void {
cb(result)
}

private readonly cancellationList = new CancellationList()
private readonly node: LinkedListNode<ICancellable>
private node?: LinkedListNode<ICancellable>
private readonly observers = DoublyLinkedList.of<CBOption<E, A>>()
private result: Option<Either<E, A>> = Option.none()
private readonly stackA = new Array<Instruction>()
private readonly stackEnv = new Array<unknown>()
private status = FiberStatus.PENDING

private constructor(
private readonly scheduler: IScheduler,
instruction: Instruction
instruction: Instruction,
public readonly runtime: IRuntime
) {
super()
this.stackA.push(instruction)
this.node = this.cancellationList.push(
this.scheduler.asap(this.unsafeEvaluate.bind(this))
)
this.init()
}

public cancel(): void {
Expand All @@ -99,10 +123,18 @@ export class FiberContext<E, A> implements ICancellable, IFiber<E, A> {

public unsafeObserve(cb: CBOption<E, A>): ICancellable {
if (this.status === FiberStatus.CANCELLED) {
return this.scheduler.asap(FiberContext.dispatchResult, Option.none(), cb)
return this.runtime.scheduler.asap(
FiberContext.dispatchResult,
Option.none(),
cb
)
}
if (this.status === FiberStatus.COMPLETED) {
return this.scheduler.asap(FiberContext.dispatchResult, this.result, cb)
return this.runtime.scheduler.asap(
FiberContext.dispatchResult,
this.result,
cb
)
}

const node = this.observers.add(cb)
Expand All @@ -116,11 +148,24 @@ export class FiberContext<E, A> implements ICancellable, IFiber<E, A> {
this.observers.map(_ => _(this.result))
}

private unsafeEvaluate(): void {
this.cancellationList.remove(this.node)
let data: unknown
private init(data?: unknown): void {
this.node = this.cancellationList.push(
this.runtime.scheduler.asap(this.unsafeEvaluate.bind(this), data)
)
}

private unsafeEvaluate(ddd?: unknown): void {
if (this.node !== undefined) {
this.cancellationList.remove(this.node)
this.node = undefined
}

let data: unknown = ddd
let count = 0
while (true) {
if (count === this.runtime.maxInstructionCount) {
return this.init(data)
}
try {
const j = this.stackA.pop()

Expand All @@ -137,6 +182,10 @@ export class FiberContext<E, A> implements ICancellable, IFiber<E, A> {
this.stackA.push(j.i0(...j.i1))
break

case Tag.Runtime:
data = this.runtime
break

case Tag.Reject:
while (
this.stackA.length > 0 &&
Expand Down Expand Up @@ -188,7 +237,7 @@ export class FiberContext<E, A> implements ICancellable, IFiber<E, A> {
// A new context is created so that computation from that instruction can happen separately.
// and then join back into the current context.
// Using the same stack will corrupt it completely.
const nContext = new FiberContext(this.scheduler, j.i0)
const nContext = new FiberContext(j.i0, j.i1)
this.cancellationList.push(nContext)
data = nContext
break
Expand Down Expand Up @@ -236,12 +285,13 @@ export class FiberContext<E, A> implements ICancellable, IFiber<E, A> {
} catch (e) {
this.stackA.push(FIO.reject(e).asInstruction)
}
count++
}
}

private unsafeRelease(p: UIO<void>): void {
this.cancellationList.push({
cancel: () => FiberContext.evaluateWith(p, this.scheduler)
cancel: () => Fiber.unsafeExecuteWith(p, this.runtime)
})
}
}
Loading

0 comments on commit ef9d03b

Please sign in to comment.