Skip to content

Commit

Permalink
Split performStream into functions handling stream of futures
Browse files Browse the repository at this point in the history
  • Loading branch information
paldepind committed Sep 7, 2019
1 parent c4b0282 commit e9cc723
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 292 deletions.
33 changes: 32 additions & 1 deletion src/behavior.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ import { combine, isPlaceholder } from "./index";
import { State, Reactive, Time, BListener, Parent, SListener } from "./common";
import { Future, BehaviorFuture } from "./future";
import * as F from "./future";
import { Stream } from "./stream";
import {
Stream,
FlatFutureOrdered,
FlatFutureLatest,
FlatFuture
} from "./stream";
import { tick, getTime } from "./clock";
import { sample, Now } from "./now";

Expand Down Expand Up @@ -732,3 +737,29 @@ export function format(
): Behavior<string> {
return new FormatBehavior(strings, behaviors);
}

export const flatFutureFrom = <A>(
stream: Stream<Future<A>>
): Behavior<Stream<A>> => fromFunction(() => new FlatFuture(stream));

export function flatFuture<A>(stream: Stream<Future<A>>): Now<Stream<A>> {
return sample(flatFutureFrom(stream));
}

export const flatFutureOrderedFrom = <A>(
stream: Stream<Future<A>>
): Behavior<Stream<A>> => fromFunction(() => new FlatFutureOrdered(stream));

export function flatFutureOrdered<A>(
stream: Stream<Future<A>>
): Now<Stream<A>> {
return sample(flatFutureOrderedFrom(stream));
}

export const flatFutureLatestFrom = <A>(
stream: Stream<Future<A>>
): Behavior<Stream<A>> => fromFunction(() => new FlatFutureLatest(stream));

export function flatFutureLatest<A>(stream: Stream<Future<A>>): Now<Stream<A>> {
return sample(flatFutureLatestFrom(stream));
}
100 changes: 7 additions & 93 deletions src/now.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { IO, runIO } from "@funkia/io";
import { placeholder } from "./placeholder";
import { Time, SListener } from "./common";
import { Future, fromPromise, mapCbFuture } from "./future";
import { Node } from "./datastructures";
import { Time } from "./common";
import { Future, fromPromise, mapCbFuture, sinkFuture } from "./future";
import { Behavior } from "./behavior";
import { ActiveStream, Stream, mapCbStream, isStream } from "./stream";
import { Stream, mapCbStream, isStream } from "./stream";
import { tick } from "./clock";

export type MapNowTuple<A> = { [K in keyof A]: Now<A[K]> };
Expand Down Expand Up @@ -104,11 +103,11 @@ export function sample<A>(b: Behavior<A>): Now<A> {
}

export class PerformNow<A> extends Now<A> {
constructor(private cb: () => A) {
constructor(private _run: () => A) {
super();
}
run(): A {
return this.cb();
return this._run();
}
}

Expand All @@ -124,9 +123,9 @@ export function performIO<A>(comp: IO<A>): Now<Future<A>> {
return perform(() => fromPromise(runIO(comp)));
}

export function performStream<A>(s: Stream<IO<A>>): Now<Stream<A>> {
export function performStream<A>(s: Stream<IO<A>>): Now<Stream<Future<A>>> {
return perform(() =>
mapCbStream<IO<A>, A>((io, cb) => runIO(io).then(cb), s)
mapCbStream<IO<A>, Future<A>>((io, cb) => cb(fromPromise(runIO(io))), s)
);
}

Expand Down Expand Up @@ -157,91 +156,6 @@ export function performMap<A, B>(
);
}

class PerformIOLatestStream<A> extends ActiveStream<A>
implements SListener<IO<A>> {
private node: Node<this> = new Node(this);
constructor(s: Stream<IO<A>>) {
super();
s.addListener(this.node, tick());
}
next: number = 0;
newest: number = 0;
running: number = 0;
pushS(_t: number, io: IO<A>): void {
const time = ++this.next;
this.running++;
runIO(io).then((a: A) => {
this.running--;
if (time > this.newest) {
const t = tick();
if (this.running === 0) {
this.next = 0;
this.newest = 0;
} else {
this.newest = time;
}
this.pushSToChildren(t, a);
}
});
}
}

export class PerformStreamLatestNow<A> extends Now<Stream<A>> {
constructor(private s: Stream<IO<A>>) {
super();
}
run(): Stream<A> {
return new PerformIOLatestStream(this.s);
}
}

export function performStreamLatest<A>(s: Stream<IO<A>>): Now<Stream<A>> {
return perform(() => new PerformIOLatestStream(s));
}

class PerformIOStreamOrdered<A> extends ActiveStream<A> {
private node: Node<this> = new Node(this);
constructor(s: Stream<IO<A>>) {
super();
s.addListener(this.node, tick());
}
nextId: number = 0;
next: number = 0;
buffer: { value: A }[] = []; // Object-wrapper to support a result as undefined
pushS(_t: number, io: IO<A>): void {
const id = this.nextId++;
runIO(io).then((a: A) => {
if (id === this.next) {
this.buffer[0] = { value: a };
this.pushFromBuffer();
} else {
this.buffer[id - this.next] = { value: a };
}
});
}
pushFromBuffer(): void {
while (this.buffer[0] !== undefined) {
const t = tick();
const { value } = this.buffer.shift();
this.pushSToChildren(t, value);
this.next++;
}
}
}

export class PerformStreamOrderedNow<A> extends Now<Stream<A>> {
constructor(private s: Stream<IO<A>>) {
super();
}
run(): Stream<A> {
return new PerformIOStreamOrdered(this.s);
}
}

export function performStreamOrdered<A>(s: Stream<IO<A>>): Now<Stream<A>> {
return new PerformStreamOrderedNow(s);
}

export function plan<A>(future: Future<Now<A>>): Now<Future<A>> {
return performMap<Now<A>, A>(runNow, future);
}
Expand Down
70 changes: 69 additions & 1 deletion src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import {
accum
} from "./behavior";
import { tick } from "./clock";
import { Now, sample } from "./now";
import { Now, sample, perform } from "./now";
import { Future } from ".";

/**
* A stream is a list of occurrences over time. Each occurrence
Expand Down Expand Up @@ -477,3 +478,70 @@ export function mapCbStream<A, B>(
): Stream<B> {
return new PerformCbStream(cb, stream);
}

export class FlatFuture<A> extends Stream<A> {
constructor(stream: Stream<Future<A>>) {
super();
this.parents = cons(stream);
}
pushS(_t: number, fut: Future<A>): void {
fut.subscribe((a) => this.pushSToChildren(tick(), a));
}
}

export class FlatFutureOrdered<A> extends Stream<A> {
constructor(stream: Stream<Future<A>>) {
super();
this.parents = cons(stream);
}
nextId: number = 0;
next: number = 0;
buffer: { value: A }[] = []; // Object-wrapper to support a result as undefined
pushS(_t: number, fut: Future<A>): void {
const id = this.nextId++;
fut.subscribe((a: A) => {
if (id === this.next) {
this.buffer[0] = { value: a };
this.pushFromBuffer();
} else {
this.buffer[id - this.next] = { value: a };
}
});
}
pushFromBuffer(): void {
while (this.buffer[0] !== undefined) {
const t = tick();
const { value } = this.buffer.shift();
this.pushSToChildren(t, value);
this.next++;
}
}
}

export class FlatFutureLatest<A> extends Stream<A>
implements SListener<Future<A>> {
constructor(stream: Stream<Future<A>>) {
super();
this.parents = cons(stream);
}
next: number = 0;
newest: number = 0;
running: number = 0;
pushS(_t: number, fut: Future<A>): void {
const time = ++this.next;
this.running++;
fut.subscribe((a: A) => {
this.running--;
if (time > this.newest) {
const t = tick();
if (this.running === 0) {
this.next = 0;
this.newest = 0;
} else {
this.newest = time;
}
this.pushSToChildren(t, a);
}
});
}
}
57 changes: 38 additions & 19 deletions src/testing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import {
ScanStream,
CombineStream,
SnapshotStream,
isStream
isStream,
FlatFuture,
FlatFutureOrdered,
FlatFutureLatest
} from "./stream";
import {
Behavior,
Expand All @@ -35,8 +38,6 @@ import {
FlatMapNow,
PerformNow,
PerformMapNow,
PerformStreamLatestNow,
PerformStreamOrderedNow,
Now,
MapNow,
InstantNow
Expand Down Expand Up @@ -218,6 +219,40 @@ DelayStream.prototype.model = function<A>(this: DelayStream<A>) {
return s.map(({ time, value }) => ({ time: time + this.ms, value }));
};

const flatFuture = <A>(o: Occurrence<Future<A>>) => {
const { time, value } = o.value.model();
return time === "infinity" ? [] : [{ time: Math.max(o.time, time), value }];
};

FlatFuture.prototype.model = function<A>(this: FlatFuture<A>) {
return (this.parents.value as Stream<Future<A>>)
.model()
.flatMap(flatFuture)
.sort((o, p) => o.time - p.time); // FIXME: Should use stable sort here
};

FlatFutureOrdered.prototype.model = function<A>(this: FlatFutureOrdered<A>) {
return (this.parents.value as Stream<Future<A>>)
.model()
.flatMap(flatFuture)
.reduce((acc, o) => {
const last = acc.length === 0 ? -Infinity : acc[acc.length - 1].time;
return acc.concat([{ time: Math.max(last, o.time), value: o.value }]);
}, []);
};

FlatFutureLatest.prototype.model = function<A>(this: FlatFutureLatest<A>) {
return (this.parents.value as Stream<Future<A>>)
.model()
.flatMap(flatFuture)
.reduceRight<Occurrence<A>[]>((acc, o) => {
const last = acc.length === 0 ? Infinity : acc[0].time;
return last < o.time
? acc
: [{ time: o.time, value: o.value }].concat(acc);
}, []);
};

class TestStream<A> extends Stream<A> {
constructor(private streamModel: StreamModel<A>) {
super();
Expand Down Expand Up @@ -400,22 +435,6 @@ PerformMapNow.prototype.model = function<A, B>(
return { value, mocks };
};

PerformStreamLatestNow.prototype.model = function<A>(
this: PerformStreamLatestNow<A>,
[value, ...mocks]: any[],
_t: Time
): NowModel<A> {
return { value, mocks };
};

PerformStreamOrderedNow.prototype.model = function<A>(
this: PerformStreamOrderedNow<A>,
[value, ...mocks]: any[],
_t: Time
): NowModel<A> {
return { value, mocks };
};

/**
* Test run a now computation without executing its side-effects.
* @param now The now computation to test.
Expand Down
Loading

0 comments on commit e9cc723

Please sign in to comment.