diff --git a/src/behavior.ts b/src/behavior.ts index 627832f..dece038 100644 --- a/src/behavior.ts +++ b/src/behavior.ts @@ -3,7 +3,12 @@ import { combine, isPlaceholder } from "./index"; import { State, Reactive, Time, BListener, Parent, SListener } from "./common"; import { Future, BehaviorFuture } from "./future"; import * as F from "./future"; -import { Stream } from "./stream"; +import { + Stream, + FlatFutureOrdered, + FlatFutureLatest, + FlatFuture +} from "./stream"; import { tick, getTime } from "./clock"; import { sample, Now } from "./now"; @@ -732,3 +737,29 @@ export function format( ): Behavior { return new FormatBehavior(strings, behaviors); } + +export const flatFutureFrom = ( + stream: Stream> +): Behavior> => fromFunction(() => new FlatFuture(stream)); + +export function flatFuture(stream: Stream>): Now> { + return sample(flatFutureFrom(stream)); +} + +export const flatFutureOrderedFrom = ( + stream: Stream> +): Behavior> => fromFunction(() => new FlatFutureOrdered(stream)); + +export function flatFutureOrdered( + stream: Stream> +): Now> { + return sample(flatFutureOrderedFrom(stream)); +} + +export const flatFutureLatestFrom = ( + stream: Stream> +): Behavior> => fromFunction(() => new FlatFutureLatest(stream)); + +export function flatFutureLatest(stream: Stream>): Now> { + return sample(flatFutureLatestFrom(stream)); +} diff --git a/src/now.ts b/src/now.ts index 32e8eb1..45f88b0 100644 --- a/src/now.ts +++ b/src/now.ts @@ -1,10 +1,9 @@ import { IO, runIO } from "@funkia/io"; import { placeholder } from "./placeholder"; -import { Time, SListener } from "./common"; -import { Future, fromPromise, mapCbFuture } from "./future"; -import { Node } from "./datastructures"; +import { Time } from "./common"; +import { Future, fromPromise, mapCbFuture, sinkFuture } from "./future"; import { Behavior } from "./behavior"; -import { ActiveStream, Stream, mapCbStream, isStream } from "./stream"; +import { Stream, mapCbStream, isStream } from "./stream"; import { tick } from "./clock"; export type MapNowTuple = { [K in keyof A]: Now }; @@ -104,11 +103,11 @@ export function sample(b: Behavior): Now { } export class PerformNow extends Now { - constructor(private cb: () => A) { + constructor(private _run: () => A) { super(); } run(): A { - return this.cb(); + return this._run(); } } @@ -124,9 +123,9 @@ export function performIO(comp: IO): Now> { return perform(() => fromPromise(runIO(comp))); } -export function performStream(s: Stream>): Now> { +export function performStream(s: Stream>): Now>> { return perform(() => - mapCbStream, A>((io, cb) => runIO(io).then(cb), s) + mapCbStream, Future>((io, cb) => cb(fromPromise(runIO(io))), s) ); } @@ -157,91 +156,6 @@ export function performMap( ); } -class PerformIOLatestStream extends ActiveStream - implements SListener> { - private node: Node = new Node(this); - constructor(s: Stream>) { - super(); - s.addListener(this.node, tick()); - } - next: number = 0; - newest: number = 0; - running: number = 0; - pushS(_t: number, io: IO): void { - const time = ++this.next; - this.running++; - runIO(io).then((a: A) => { - this.running--; - if (time > this.newest) { - const t = tick(); - if (this.running === 0) { - this.next = 0; - this.newest = 0; - } else { - this.newest = time; - } - this.pushSToChildren(t, a); - } - }); - } -} - -export class PerformStreamLatestNow extends Now> { - constructor(private s: Stream>) { - super(); - } - run(): Stream { - return new PerformIOLatestStream(this.s); - } -} - -export function performStreamLatest(s: Stream>): Now> { - return perform(() => new PerformIOLatestStream(s)); -} - -class PerformIOStreamOrdered extends ActiveStream { - private node: Node = new Node(this); - constructor(s: Stream>) { - super(); - s.addListener(this.node, tick()); - } - nextId: number = 0; - next: number = 0; - buffer: { value: A }[] = []; // Object-wrapper to support a result as undefined - pushS(_t: number, io: IO): void { - const id = this.nextId++; - runIO(io).then((a: A) => { - if (id === this.next) { - this.buffer[0] = { value: a }; - this.pushFromBuffer(); - } else { - this.buffer[id - this.next] = { value: a }; - } - }); - } - pushFromBuffer(): void { - while (this.buffer[0] !== undefined) { - const t = tick(); - const { value } = this.buffer.shift(); - this.pushSToChildren(t, value); - this.next++; - } - } -} - -export class PerformStreamOrderedNow extends Now> { - constructor(private s: Stream>) { - super(); - } - run(): Stream { - return new PerformIOStreamOrdered(this.s); - } -} - -export function performStreamOrdered(s: Stream>): Now> { - return new PerformStreamOrderedNow(s); -} - export function plan(future: Future>): Now> { return performMap, A>(runNow, future); } diff --git a/src/stream.ts b/src/stream.ts index b9b1481..528eac0 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -10,7 +10,8 @@ import { accum } from "./behavior"; import { tick } from "./clock"; -import { Now, sample } from "./now"; +import { Now, sample, perform } from "./now"; +import { Future } from "."; /** * A stream is a list of occurrences over time. Each occurrence @@ -477,3 +478,70 @@ export function mapCbStream( ): Stream { return new PerformCbStream(cb, stream); } + +export class FlatFuture extends Stream { + constructor(stream: Stream>) { + super(); + this.parents = cons(stream); + } + pushS(_t: number, fut: Future): void { + fut.subscribe((a) => this.pushSToChildren(tick(), a)); + } +} + +export class FlatFutureOrdered extends Stream { + constructor(stream: Stream>) { + super(); + this.parents = cons(stream); + } + nextId: number = 0; + next: number = 0; + buffer: { value: A }[] = []; // Object-wrapper to support a result as undefined + pushS(_t: number, fut: Future): void { + const id = this.nextId++; + fut.subscribe((a: A) => { + if (id === this.next) { + this.buffer[0] = { value: a }; + this.pushFromBuffer(); + } else { + this.buffer[id - this.next] = { value: a }; + } + }); + } + pushFromBuffer(): void { + while (this.buffer[0] !== undefined) { + const t = tick(); + const { value } = this.buffer.shift(); + this.pushSToChildren(t, value); + this.next++; + } + } +} + +export class FlatFutureLatest extends Stream + implements SListener> { + constructor(stream: Stream>) { + super(); + this.parents = cons(stream); + } + next: number = 0; + newest: number = 0; + running: number = 0; + pushS(_t: number, fut: Future): void { + const time = ++this.next; + this.running++; + fut.subscribe((a: A) => { + this.running--; + if (time > this.newest) { + const t = tick(); + if (this.running === 0) { + this.next = 0; + this.newest = 0; + } else { + this.newest = time; + } + this.pushSToChildren(t, a); + } + }); + } +} diff --git a/src/testing.ts b/src/testing.ts index 264356c..6bc9692 100644 --- a/src/testing.ts +++ b/src/testing.ts @@ -8,7 +8,10 @@ import { ScanStream, CombineStream, SnapshotStream, - isStream + isStream, + FlatFuture, + FlatFutureOrdered, + FlatFutureLatest } from "./stream"; import { Behavior, @@ -35,8 +38,6 @@ import { FlatMapNow, PerformNow, PerformMapNow, - PerformStreamLatestNow, - PerformStreamOrderedNow, Now, MapNow, InstantNow @@ -218,6 +219,40 @@ DelayStream.prototype.model = function(this: DelayStream) { return s.map(({ time, value }) => ({ time: time + this.ms, value })); }; +const flatFuture = (o: Occurrence>) => { + const { time, value } = o.value.model(); + return time === "infinity" ? [] : [{ time: Math.max(o.time, time), value }]; +}; + +FlatFuture.prototype.model = function(this: FlatFuture) { + return (this.parents.value as Stream>) + .model() + .flatMap(flatFuture) + .sort((o, p) => o.time - p.time); // FIXME: Should use stable sort here +}; + +FlatFutureOrdered.prototype.model = function(this: FlatFutureOrdered) { + return (this.parents.value as Stream>) + .model() + .flatMap(flatFuture) + .reduce((acc, o) => { + const last = acc.length === 0 ? -Infinity : acc[acc.length - 1].time; + return acc.concat([{ time: Math.max(last, o.time), value: o.value }]); + }, []); +}; + +FlatFutureLatest.prototype.model = function(this: FlatFutureLatest) { + return (this.parents.value as Stream>) + .model() + .flatMap(flatFuture) + .reduceRight[]>((acc, o) => { + const last = acc.length === 0 ? Infinity : acc[0].time; + return last < o.time + ? acc + : [{ time: o.time, value: o.value }].concat(acc); + }, []); +}; + class TestStream extends Stream { constructor(private streamModel: StreamModel) { super(); @@ -400,22 +435,6 @@ PerformMapNow.prototype.model = function( return { value, mocks }; }; -PerformStreamLatestNow.prototype.model = function( - this: PerformStreamLatestNow, - [value, ...mocks]: any[], - _t: Time -): NowModel { - return { value, mocks }; -}; - -PerformStreamOrderedNow.prototype.model = function( - this: PerformStreamOrderedNow, - [value, ...mocks]: any[], - _t: Time -): NowModel { - return { value, mocks }; -}; - /** * Test run a now computation without executing its side-effects. * @param now The now computation to test. diff --git a/test/now.ts b/test/now.ts index f3debdb..542e47d 100644 --- a/test/now.ts +++ b/test/now.ts @@ -10,8 +10,6 @@ import { performIO, Now, performStream, - performStreamLatest, - performStreamOrdered, plan, runNow, sample, @@ -20,7 +18,8 @@ import { SinkStream, time, toPromise, - instant + instant, + flatFuture } from "../src"; import * as H from "../src"; import { createRef, mutateRef } from "./helpers"; @@ -92,19 +91,18 @@ describe("Now", () => { }); }); describe("async", () => { - it("works with runNow", () => { + it("works with runNow", async () => { let resolve: (n: number) => void; const future = runNow( performIO( - callP((n: number) => new Promise((res) => (resolve = res)), 0) + callP((_n: number) => new Promise((res) => (resolve = res)), 0) ) ); setTimeout(() => { resolve(12); }); - return toPromise(future).then((result: number) => { - assert.deepEqual(result, 12); - }); + const result = await toPromise(future); + assert.deepEqual(result, 12); }); }); describe("sample", () => { @@ -119,7 +117,7 @@ describe("Now", () => { it("executes plan asynchronously", async () => { let resolve: (n: number) => void; let done = false; - const fn = withEffectsP((n: number) => { + const fn = withEffectsP((_n: number) => { return new Promise((res) => { resolve = res; }); @@ -151,7 +149,7 @@ describe("Now", () => { }); it("handles recursively defined behavior", () => { let resolve: (n: number) => void; - const getNextNr = withEffectsP((n: number) => { + const getNextNr = withEffectsP((_n: number) => { return new Promise((res) => { resolve = res; }); @@ -197,7 +195,9 @@ describe("Now", () => { }); const s = sinkStream(); const mappedS = s.map(impure); - runNow(performStream(mappedS)).subscribe((n) => results.push(n)); + runNow(performStream(mappedS).flatMap(flatFuture)).subscribe((n) => + results.push(n) + ); s.push(1); setTimeout(() => { s.push(2); @@ -232,105 +232,6 @@ describe("Now", () => { assert.deepEqual(cb.args, [[9]]); }); }); - describe("performStreamLatest", () => { - it("work with one occurrence", (done: Function) => { - let results: any[] = []; - const impure = withEffectsP( - (n: number) => new Promise((resolve, reject) => resolve(n)) - ); - const s = sinkStream(); - const mappedS = s.map(impure); - runNow(performStreamLatest(mappedS)).subscribe((n) => results.push(n)); - s.push(60); - setTimeout(() => { - assert.deepEqual(results, [60]); - done(); - }); - }); - it("runs io actions and ignores outdated results", (done: Function) => { - const resolves: ((n: any) => void)[] = []; - let results: any[] = []; - const impure = withEffectsP((n: number) => { - return new Promise((resolve, reject) => { - resolves[n] = resolve; - }); - }); - const s = sinkStream(); - const mappedS = s.map(impure); - runNow(performStreamLatest(mappedS)).subscribe((n) => results.push(n)); - s.push(0); - s.push(1); - s.push(2); - resolves[1](1); - resolves[2](2); - resolves[0](0); - setTimeout(() => { - assert.deepEqual(results, [1, 2]); - done(); - }); - }); - }); - describe("performStreamOrdered", () => { - it("work with one occurrence", (done: Function) => { - let results: any[] = []; - const impure = withEffectsP( - (n: number) => new Promise((resolve, reject) => resolve(n)) - ); - const s = sinkStream(); - const mappedS = s.map(impure); - runNow(performStreamOrdered(mappedS)).subscribe((n) => results.push(n)); - s.push(60); - setTimeout(() => { - assert.deepEqual(results, [60]); - done(); - }); - }); - it("runs io actions and makes sure to keep the results in the same order", (done: Function) => { - let results: any[] = []; - const resolves: ((n: any) => void)[] = []; - const impure = withEffectsP((n: number) => { - return new Promise((resolve, reject) => { - resolves[n] = resolve; - }); - }); - const s = sinkStream(); - const mappedS = s.map(impure); - runNow(performStreamOrdered(mappedS)).subscribe((n) => results.push(n)); - s.push(0); - s.push(1); - s.push(2); - s.push(3); - s.push(4); - s.push(5); - resolves[3](3); - resolves[1](1); - resolves[0]("zero"); - resolves[4](undefined); - resolves[2](2); - resolves[5](5); - setTimeout(() => { - assert.deepEqual(results, ["zero", 1, 2, 3, undefined, 5]); - done(); - }); - }); - - it("should support `undefined` as result", (done: any) => { - let results: any[] = []; - const impure = withEffectsP( - (n: number) => new Promise((resolve, reject) => resolve(n)) - ); - const s = sinkStream(); - const mappedS = s.map(impure); - runNow(performStreamOrdered(mappedS)).subscribe((n) => results.push(n)); - s.push(60); - s.push(undefined); - s.push(20); - setTimeout(() => { - assert.deepEqual(results, [60, undefined, 20]); - done(); - }); - }); - }); describe("loopNow", () => { it("should loop the reactives", () => { let result = []; diff --git a/test/stream.ts b/test/stream.ts index 31c64b7..a2aea8a 100644 --- a/test/stream.ts +++ b/test/stream.ts @@ -1,6 +1,14 @@ import { assert } from "chai"; import { spy, useFakeTimers } from "sinon"; -import { map, push, Behavior, fromFunction, sinkBehavior } from "../src"; +import { + map, + push, + Behavior, + fromFunction, + sinkBehavior, + sinkStream, + Future +} from "../src"; import * as H from "../src"; import { subscribeSpy } from "./helpers"; @@ -500,4 +508,51 @@ describe("stream", () => { assert.deepEqual(strings, [["s", "hello"], ["s", "world"]]); }); }); + describe("flatten futures", () => { + it("gives values in the order they resolve", () => { + const fut1 = H.sinkFuture(); + const fut2 = H.sinkFuture(); + const fut3 = H.sinkFuture(); + const s = H.sinkStream>(); + const s2 = H.runNow(H.flatFuture(s)); + const sub = subscribeSpy(s2); + s.push(fut1); + s.push(fut2); + s.push(fut3); + fut2.resolve(1); + fut1.resolve(2); + fut3.resolve(3); + assert.deepEqual(sub.args, [[1], [2], [3]]); + }); + it("can preserve order", () => { + const fut1 = H.sinkFuture(); + const fut2 = H.sinkFuture(); + const fut3 = H.sinkFuture(); + const s = H.sinkStream>(); + const s2 = H.runNow(H.flatFutureOrdered(s)); + const sub = subscribeSpy(s2); + s.push(fut1); + s.push(fut2); + s.push(fut3); + fut2.resolve(1); + fut1.resolve(2); + fut3.resolve(3); + assert.deepEqual(sub.args, [[2], [1], [3]]); + }); + it("discards outdated responses", () => { + const fut1 = H.sinkFuture(); + const fut2 = H.sinkFuture(); + const fut3 = H.sinkFuture(); + const s = H.sinkStream>(); + const s2 = H.runNow(H.flatFutureLatest(s)); + const sub = subscribeSpy(s2); + s.push(fut1); + s.push(fut2); + s.push(fut3); + fut2.resolve(1); + fut1.resolve(2); + fut3.resolve(3); + assert.deepEqual(sub.args, [[1], [3]]); + }); + }); }); diff --git a/test/testing.ts b/test/testing.ts index 3670e7a..6dbb409 100644 --- a/test/testing.ts +++ b/test/testing.ts @@ -1,6 +1,6 @@ import { assert } from "chai"; import * as H from "../src"; -import { Behavior, Stream, Now } from "../src"; +import { Behavior, Stream, Now, sinkBehavior } from "../src"; import { testFuture, assertFutureEqual, @@ -12,7 +12,7 @@ import { testNow, assertBehaviorEqual } from "../src/testing"; -import { createRef, mutateRef } from "./helpers"; +import { createRef, mutateRef, subscribeSpy } from "./helpers"; import { fgo, withEffects } from "@funkia/jabz"; describe("testing", () => { @@ -179,7 +179,7 @@ describe("testing", () => { }); it("works", () => { function foobar(s1: Stream, s2: Stream) { - const isEven = (n) => n % 2 === 0; + const isEven = (n: number) => n % 2 === 0; const a = s1.filter(isEven).map((n) => n * n); const b = s2.filter((n) => !isEven(n)).map(Math.sqrt); return a.combine(b); @@ -207,6 +207,63 @@ describe("testing", () => { assertStreamEqual(res, { 4: 1, 5: 1, 7: 2, 9: 3, 10: 1 }); }); }); + describe("flatFuture", () => { + it("can be tested", () => { + const s = testStreamFromObject({ + 0: testFuture(1, "a"), + 2: testFuture(5, "b"), + 4: testFuture(2, "c"), + 6: testFuture(7, "d") + }); + const res = testNow(H.flatFuture(s), []); + assert(H.isStream(res)); + assertStreamEqual( + res, + testStreamFromArray([[1, "a"], [4, "c"], [5, "b"], [7, "d"]]) + ); + }); + }); + describe("flatFutureLatest", () => { + it("can be tested", () => { + const s = testStreamFromObject({ + 0: testFuture(1, "a"), + 2: testFuture(6, "b"), // should be dropped + 4: testFuture(5, "c"), + 6: testFuture(12, "d"), // should be dropped + 8: testFuture(12, "e"), // should be dropped + 10: testFuture(3, "f") + }); + const res = testNow(H.flatFutureLatest(s), []); + assert(H.isStream(res)); + assertStreamEqual( + res, + testStreamFromArray([[1, "a"], [5, "c"], [10, "f"]]) + ); + }); + }); + describe("flatFutureOrdered", () => { + it("can be tested", () => { + const s = testStreamFromObject({ + 0: testFuture(3, "a"), + 1: testFuture(2, "b"), + 2: testFuture(4, "c"), + 3: testFuture(0, "d"), + 4: testFuture(5, "e") + }); + const res = testNow(H.flatFutureOrdered(s), []); + assert(H.isStream(res)); + assertStreamEqual( + res, + testStreamFromArray([ + [3, "a"], + [3, "b"], + [4, "c"], + [4, "d"], + [5, "e"] + ]) + ); + }); + }); }); describe("behavior", () => { describe("assertBehaviorEqual", () => { @@ -236,7 +293,7 @@ describe("testing", () => { }); describe("mapTo", () => { it("creates constant function", () => { - const b = testBehavior((t) => { + const b = testBehavior((_) => { throw new Error("Don't call me"); }); const mapped = b.mapTo(7); @@ -319,68 +376,18 @@ describe("testing", () => { return n + 2; }) ); - const response: Stream = yield H.performStream(request); - return { res: response }; - }); - const click = testStreamFromObject({ 1: 1, 2: 2, 3: 3, 4: 4, 5: 5 }); - const out: { res: Stream } = testNow(model({ click }), [ - testStreamFromArray([[0, "old1"], [1, "old2"], [2, "response"]]) - ]); - assert(H.isStream(out.res)); - assertStreamEqual( - out.res, - testStreamFromObject({ 0: "old1", 1: "old2", 2: "response" }) - ); - assert.deepEqual(requests, []); - }); - }); - describe("performStreamLatest", () => { - it("can be tested", () => { - let requests: number[] = []; - const model = fgo(function*({ click }) { - const request = click.mapTo( - withEffects((n: number) => { - requests.push(n); - return n + 2; - }) - ); - const response = yield H.performStreamLatest(request); - const res = H.stepperFrom("", response.map((e) => e.toString())); - return { res }; - }); - const click = testStreamFromObject({ 1: 1, 2: 2, 3: 3, 4: 4, 5: 5 }); - const out: { res: Behavior> } = testNow( - model({ click }), - [testStreamFromObject({ 0: "old", 1: "old", 2: "response" })] - ); - assert(H.isBehavior(out.res)); - assert.equal( - out.res - .model()(0) - .model()(4), - "response" - ); - assert.deepEqual(requests, []); - }); - }); - describe("performStreamOrdered", () => { - it("can be tested", () => { - let requests: number[] = []; - const model = fgo(function*({ click }) { - const request = click.mapTo( - withEffects((n: number) => { - requests.push(n); - return n + 2; - }) - ); - const response: Stream = yield H.performStreamOrdered( + const response: Stream = yield H.performStream( request - ); + ).flatMap(H.flatFutureOrdered); return { res: response }; }); const click = testStreamFromObject({ 1: 1, 2: 2, 3: 3, 4: 4, 5: 5 }); const out: { res: Stream } = testNow(model({ click }), [ - testStreamFromArray([[0, "old1"], [1, "old2"], [2, "response"]]) + testStreamFromArray([ + [0, testFuture(0, "old1")], + [1, testFuture(1, "old2")], + [2, testFuture(2, "response")] + ]) ]); assert(H.isStream(out.res)); assertStreamEqual( @@ -401,12 +408,18 @@ describe("testing", () => { return n + 2; }) ); - const res = run(H.performStreamOrdered(request)); + const res = run( + H.performStream(request).flatMap(H.flatFutureOrdered) + ); return { res }; }); const click = testStreamFromObject({ 1: 1, 2: 2, 3: 3, 4: 4, 5: 5 }); const out = testNow(model(click), [ - testStreamFromArray([[0, "old1"], [1, "old2"], [2, "response"]]) + testStreamFromArray([ + [0, testFuture(0, "old1")], + [1, testFuture(1, "old2")], + [2, testFuture(2, "response")] + ]) ]); assert(H.isStream(out.res)); assertStreamEqual( diff --git a/tsconfig.json b/tsconfig.json index 94219ed..7bdd462 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -10,10 +10,8 @@ "lib": [ "dom", "es5", - "es2015.core", - "es2015.promise", - "es2015.iterable", - "es2015.proxy" + "es2015", + "es2019" ] }, "include": ["src/**/*", "test/**/*"],