Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split performStream into functions handling stream of futures #68

Merged
merged 2 commits into from
Sep 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
language: node_js
node_js:
- "node"
- "8"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the test implementations use Array#flatMap which this old NodeJS version does not support.


after_success:
- "npm run codecov"
- "npm run codecov"
35 changes: 34 additions & 1 deletion src/behavior.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ import { combine, isPlaceholder } from "./index";
import { State, Reactive, Time, BListener, Parent, SListener } from "./common";
import { Future, BehaviorFuture } from "./future";
import * as F from "./future";
import { Stream } from "./stream";
import {
Stream,
FlatFuturesOrdered,
FlatFuturesLatest,
FlatFutures
} from "./stream";
import { tick, getTime } from "./clock";
import { sample, Now } from "./now";

Expand Down Expand Up @@ -747,3 +752,31 @@ export function format(
): Behavior<string> {
return new FormatBehavior(strings, behaviors);
}

export const flatFuturesFrom = <A>(
stream: Stream<Future<A>>
): Behavior<Stream<A>> => fromFunction(() => new FlatFutures(stream));

export function flatFutures<A>(stream: Stream<Future<A>>): Now<Stream<A>> {
return sample(flatFuturesFrom(stream));
}

export const flatFuturesOrderedFrom = <A>(
stream: Stream<Future<A>>
): Behavior<Stream<A>> => fromFunction(() => new FlatFuturesOrdered(stream));

export function flatFuturesOrdered<A>(
stream: Stream<Future<A>>
): Now<Stream<A>> {
return sample(flatFuturesOrderedFrom(stream));
}

export const flatFuturesLatestFrom = <A>(
stream: Stream<Future<A>>
): Behavior<Stream<A>> => fromFunction(() => new FlatFuturesLatest(stream));

export function flatFuturesLatest<A>(
stream: Stream<Future<A>>
): Now<Stream<A>> {
return sample(flatFuturesLatestFrom(stream));
}
100 changes: 7 additions & 93 deletions src/now.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { IO, runIO } from "@funkia/io";
import { placeholder } from "./placeholder";
import { Time, SListener } from "./common";
import { Future, fromPromise, mapCbFuture } from "./future";
import { Node } from "./datastructures";
import { Time } from "./common";
import { Future, fromPromise, mapCbFuture, sinkFuture } from "./future";
import { Behavior } from "./behavior";
import { ActiveStream, Stream, mapCbStream, isStream } from "./stream";
import { Stream, mapCbStream, isStream } from "./stream";
import { tick } from "./clock";

export type MapNowTuple<A> = { [K in keyof A]: Now<A[K]> };
Expand Down Expand Up @@ -104,11 +103,11 @@ export function sample<A>(b: Behavior<A>): Now<A> {
}

export class PerformNow<A> extends Now<A> {
constructor(private cb: () => A) {
constructor(private _run: () => A) {
super();
}
run(): A {
return this.cb();
return this._run();
}
}

Expand All @@ -124,9 +123,9 @@ export function performIO<A>(comp: IO<A>): Now<Future<A>> {
return perform(() => fromPromise(runIO(comp)));
}

export function performStream<A>(s: Stream<IO<A>>): Now<Stream<A>> {
export function performStream<A>(s: Stream<IO<A>>): Now<Stream<Future<A>>> {
return perform(() =>
mapCbStream<IO<A>, A>((io, cb) => runIO(io).then(cb), s)
mapCbStream<IO<A>, Future<A>>((io, cb) => cb(fromPromise(runIO(io))), s)
);
}

Expand Down Expand Up @@ -157,91 +156,6 @@ export function performMap<A, B>(
);
}

class PerformIOLatestStream<A> extends ActiveStream<A>
implements SListener<IO<A>> {
private node: Node<this> = new Node(this);
constructor(s: Stream<IO<A>>) {
super();
s.addListener(this.node, tick());
}
next: number = 0;
newest: number = 0;
running: number = 0;
pushS(_t: number, io: IO<A>): void {
const time = ++this.next;
this.running++;
runIO(io).then((a: A) => {
this.running--;
if (time > this.newest) {
const t = tick();
if (this.running === 0) {
this.next = 0;
this.newest = 0;
} else {
this.newest = time;
}
this.pushSToChildren(t, a);
}
});
}
}

export class PerformStreamLatestNow<A> extends Now<Stream<A>> {
constructor(private s: Stream<IO<A>>) {
super();
}
run(): Stream<A> {
return new PerformIOLatestStream(this.s);
}
}

export function performStreamLatest<A>(s: Stream<IO<A>>): Now<Stream<A>> {
return perform(() => new PerformIOLatestStream(s));
}

class PerformIOStreamOrdered<A> extends ActiveStream<A> {
private node: Node<this> = new Node(this);
constructor(s: Stream<IO<A>>) {
super();
s.addListener(this.node, tick());
}
nextId: number = 0;
next: number = 0;
buffer: { value: A }[] = []; // Object-wrapper to support a result as undefined
pushS(_t: number, io: IO<A>): void {
const id = this.nextId++;
runIO(io).then((a: A) => {
if (id === this.next) {
this.buffer[0] = { value: a };
this.pushFromBuffer();
} else {
this.buffer[id - this.next] = { value: a };
}
});
}
pushFromBuffer(): void {
while (this.buffer[0] !== undefined) {
const t = tick();
const { value } = this.buffer.shift();
this.pushSToChildren(t, value);
this.next++;
}
}
}

export class PerformStreamOrderedNow<A> extends Now<Stream<A>> {
constructor(private s: Stream<IO<A>>) {
super();
}
run(): Stream<A> {
return new PerformIOStreamOrdered(this.s);
}
}

export function performStreamOrdered<A>(s: Stream<IO<A>>): Now<Stream<A>> {
return new PerformStreamOrderedNow(s);
}

export function plan<A>(future: Future<Now<A>>): Now<Future<A>> {
return performMap<Now<A>, A>(runNow, future);
}
Expand Down
70 changes: 69 additions & 1 deletion src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import {
accum
} from "./behavior";
import { tick } from "./clock";
import { Now, sample } from "./now";
import { Now, sample, perform } from "./now";
import { Future } from ".";

/**
* A stream is a list of occurrences over time. Each occurrence
Expand Down Expand Up @@ -477,3 +478,70 @@ export function mapCbStream<A, B>(
): Stream<B> {
return new PerformCbStream(cb, stream);
}

export class FlatFutures<A> extends Stream<A> {
constructor(stream: Stream<Future<A>>) {
super();
this.parents = cons(stream);
}
pushS(_t: number, fut: Future<A>): void {
fut.subscribe((a) => this.pushSToChildren(tick(), a));
}
}

export class FlatFuturesOrdered<A> extends Stream<A> {
constructor(stream: Stream<Future<A>>) {
super();
this.parents = cons(stream);
}
nextId: number = 0;
next: number = 0;
buffer: { value: A }[] = []; // Object-wrapper to support a result as undefined
pushS(_t: number, fut: Future<A>): void {
const id = this.nextId++;
fut.subscribe((a: A) => {
if (id === this.next) {
this.buffer[0] = { value: a };
this.pushFromBuffer();
} else {
this.buffer[id - this.next] = { value: a };
}
});
}
pushFromBuffer(): void {
while (this.buffer[0] !== undefined) {
const t = tick();
const { value } = this.buffer.shift();
this.pushSToChildren(t, value);
this.next++;
}
}
}

export class FlatFuturesLatest<A> extends Stream<A>
implements SListener<Future<A>> {
constructor(stream: Stream<Future<A>>) {
super();
this.parents = cons(stream);
}
next: number = 0;
newest: number = 0;
running: number = 0;
pushS(_t: number, fut: Future<A>): void {
const time = ++this.next;
this.running++;
fut.subscribe((a: A) => {
this.running--;
if (time > this.newest) {
const t = tick();
if (this.running === 0) {
this.next = 0;
this.newest = 0;
} else {
this.newest = time;
}
this.pushSToChildren(t, a);
}
});
}
}
57 changes: 38 additions & 19 deletions src/testing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import {
ScanStream,
CombineStream,
SnapshotStream,
isStream
isStream,
FlatFutures,
FlatFuturesOrdered,
FlatFuturesLatest
} from "./stream";
import {
Behavior,
Expand All @@ -35,8 +38,6 @@ import {
FlatMapNow,
PerformNow,
PerformMapNow,
PerformStreamLatestNow,
PerformStreamOrderedNow,
Now,
MapNow,
InstantNow
Expand Down Expand Up @@ -218,6 +219,40 @@ DelayStream.prototype.model = function<A>(this: DelayStream<A>) {
return s.map(({ time, value }) => ({ time: time + this.ms, value }));
};

const flatFuture = <A>(o: Occurrence<Future<A>>) => {
const { time, value } = o.value.model();
return time === "infinity" ? [] : [{ time: Math.max(o.time, time), value }];
};

FlatFutures.prototype.model = function<A>(this: FlatFutures<A>) {
return (this.parents.value as Stream<Future<A>>)
.model()
.flatMap(flatFuture)
.sort((o, p) => o.time - p.time); // FIXME: Should use stable sort here
};

FlatFuturesOrdered.prototype.model = function<A>(this: FlatFuturesOrdered<A>) {
return (this.parents.value as Stream<Future<A>>)
.model()
.flatMap(flatFuture)
.reduce((acc, o) => {
const last = acc.length === 0 ? -Infinity : acc[acc.length - 1].time;
return acc.concat([{ time: Math.max(last, o.time), value: o.value }]);
}, []);
};

FlatFuturesLatest.prototype.model = function<A>(this: FlatFuturesLatest<A>) {
return (this.parents.value as Stream<Future<A>>)
.model()
.flatMap(flatFuture)
.reduceRight<Occurrence<A>[]>((acc, o) => {
const last = acc.length === 0 ? Infinity : acc[0].time;
return last < o.time
? acc
: [{ time: o.time, value: o.value }].concat(acc);
}, []);
};

class TestStream<A> extends Stream<A> {
constructor(private streamModel: StreamModel<A>) {
super();
Expand Down Expand Up @@ -400,22 +435,6 @@ PerformMapNow.prototype.model = function<A, B>(
return { value, mocks };
};

PerformStreamLatestNow.prototype.model = function<A>(
this: PerformStreamLatestNow<A>,
[value, ...mocks]: any[],
_t: Time
): NowModel<A> {
return { value, mocks };
};

PerformStreamOrderedNow.prototype.model = function<A>(
this: PerformStreamOrderedNow<A>,
[value, ...mocks]: any[],
_t: Time
): NowModel<A> {
return { value, mocks };
};

/**
* Test run a now computation without executing its side-effects.
* @param now The now computation to test.
Expand Down
Loading