Skip to content

Commit

Permalink
fix(fromnodestream): enable fromnodestream tests, fix minification pr…
Browse files Browse the repository at this point in the history
…oblems (#156)
  • Loading branch information
trxcllnt authored and mattpodwysocki committed Nov 16, 2017
1 parent 4048b33 commit 745d763
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 37 deletions.
4 changes: 3 additions & 1 deletion gulpfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const argv = require(`command-line-args`)([
const IxKeywords = [
// GroupedIterable/GroupedAsyncIterable
`key`,
// PropertyDescriptors
`configurable`, `enumerable`,
// IteratorResult, Symbol.asyncIterator
`done`, `value`, `asyncIterator`,
// AsyncObserver
Expand Down Expand Up @@ -271,7 +273,7 @@ const compileUglifyJS = ((cache, commonConfig) => memoizeTask(cache, function ug
compress: { unsafe: true, },
output: { comments: false, beautify: false },
mangle: { eval: true, safari10: true, // <-- Works around a Safari 10 bug: // https://github.com/mishoo/UglifyJS2/issues/1753
properties: { reserved: [`configurable`, `enumerable`, ...reserved] }
properties: { reserved, keep_quoted: true }
}
},
})
Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
},
"devDependencies": {
"@std/esm": "0.15.0",
"@std/esm": "0.14.0",
"@types/node": "8.0.53",
"@types/tape": "4.2.31",
"chalk": "2.3.0",
Expand Down
4 changes: 3 additions & 1 deletion spec/Ix.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
const path = require('path');
const target = process.env.IX_TARGET || ``;
const format = process.env.IX_MODULE || ``;
const useSrc = process.env.TEST_TS_SOURCE === `true`;

// these are duplicated in the gulpfile :<
const targets = [`es5`, `es2015`, `esnext`];
Expand All @@ -26,7 +27,8 @@ function throwInvalidImportError(name: string, value: string, values: string[])

let modulePath = ``;

if (target === `ts` || target === `ix`) modulePath = target;
if (useSrc) modulePath = '../src';
else if (target === `ts` || target === `ix`) modulePath = target;
else if (!~targets.indexOf(target)) throwInvalidImportError('target', target, targets);
else if (!~formats.indexOf(format)) throwInvalidImportError('module', format, formats);
else modulePath = path.join(target, format);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import * as Ix from '../Ix';
import * as test from 'tape-async';
const { fromNodeStream } = Ix;
import { fromNodeStream } from '../Ix';
import { Readable, ReadableOptions } from 'stream';

class Counter extends Readable {
Expand All @@ -10,36 +9,36 @@ class Counter extends Readable {
constructor(options?: ReadableOptions) {
super(options);
this._max = 3;
this._index = 1;
this._index = 0;
}

_read() {
const i = this._index++;
const i = ++this._index;
if (i > this._max) {
this.push(null);
} else {
const buf = Buffer.from(`${i}`, 'ascii');
const buf = Buffer.from(`${i}`, 'utf8');
this.push(buf);
}
}
}

test('AsyncIterable#fromNodeStream with readable', async t => {
const c = new Counter();
const c = new Counter({ objectMode: true });
const xs = fromNodeStream(c);

const it = xs[Symbol.asyncIterator]();
let next = await it.next();
t.false(next.done);
t.equal((next.value as Buffer).compare(new Buffer('1', 'ascii')), 0);
t.equal((next.value as Buffer).compare(Buffer.from('1', 'utf8')), 0);

next = await it.next();
t.false(next.done);
t.equal((next.value as Buffer).compare(new Buffer('2', 'ascii')), 0);
t.equal((next.value as Buffer).compare(Buffer.from('2', 'utf8')), 0);

next = await it.next();
t.false(next.done);
t.equal((next.value as Buffer).compare(new Buffer('3', 'ascii')), 0);
t.equal((next.value as Buffer).compare(Buffer.from('3', 'utf8')), 0);

next = await it.next();
t.true(next.done);
Expand Down
8 changes: 5 additions & 3 deletions src/Ix.internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ export { iterableXPipe as iterablePipe };
export { asynciterableX as asynciterable };
export { asynciterableXPipe as asynciterablePipe };

import './add/asynciterable/fromnodestream';
import { fromNodeStream, ReadableStreamAsyncIterable } from './asynciterable/fromnodestream';
export { fromNodeStream, ReadableStreamAsyncIterable };

/* These declarations are needed for the closure/umd targets */
export declare namespace Symbol {
export const iterator: symbol;
Expand All @@ -22,11 +26,9 @@ try {
Ix['iterablePipe'] = iterableXPipe;
Ix['asynciterable'] = asynciterableX;
Ix['asynciterablePipe'] = asynciterableXPipe;
Ix['fromNodeStream'] = fromNodeStream;
}
} catch (e) {
/* not the UMD bundle */
}
/** end google declarations */

import './add/asynciterable/fromnodestream';
export { fromNodeStream } from './asynciterable/fromnodestream';
42 changes: 20 additions & 22 deletions src/asynciterable/fromnodestream.ts
Original file line number Diff line number Diff line change
@@ -1,95 +1,93 @@
import { Readable } from 'stream';
import { AsyncIterableX } from './asynciterablex';

enum StreamState {
NonFlowing,
Readable,
Ended,
Errored
}
const NON_FLOWING = 0;
const READABLE = 1;
const ENDED = 2;
const ERRORED = 3;

class ReadableStreamAsyncIterable extends AsyncIterableX<string | Buffer>
export class ReadableStreamAsyncIterable extends AsyncIterableX<string | Buffer>
implements AsyncIterator<string | Buffer> {
private _stream: Readable;
private _size?: number;
private _state: StreamState;
private _state: number;
private _error: any;
private _rejectFns: Set<(err: any) => void>;

constructor(stream: Readable, size?: number) {
super();
this._stream = stream;
this._size = size;
this._state = StreamState.NonFlowing;
this._state = NON_FLOWING;
this._error = null;
this._rejectFns = new Set<(err: any) => void>();

const onError = (err: any) => {
this._state = StreamState.Errored;
this._state = ERRORED;
this._error = err;
for (const rejectFn of this._rejectFns) {
rejectFn(err);
}
};

const onEnd = () => {
this._state = StreamState.Ended;
this._state = ENDED;
};

this._stream.once('error', onError);
this._stream.once('end', onEnd);
this._stream['once']('error', onError);
this._stream['once']('end', onEnd);
}

[Symbol.asyncIterator](): AsyncIterator<string | Buffer> {
return this;
}

async next(): Promise<IteratorResult<string | Buffer>> {
if (this._state === StreamState.NonFlowing) {
if (this._state === NON_FLOWING) {
await Promise.race([this._waitReadable(), this._waitEnd()]);
return this.next();
}

if (this._state === StreamState.Ended) {
if (this._state === ENDED) {
return ({ done: true, value: undefined } as any) as IteratorResult<string | Buffer>;
}

if (this._state === StreamState.Errored) {
if (this._state === ERRORED) {
throw this._error;
}

const read = this._stream.read(this._size);
const read = this._stream['read'](this._size);
if (read !== null) {
return { done: false, value: <string | Buffer>read };
} else {
this._state = StreamState.NonFlowing;
this._state = NON_FLOWING;
return this.next();
}
}

private _waitReadable(): Promise<void> {
return new Promise<void>((resolve, reject) => {
const onReadable = () => {
this._state = StreamState.Readable;
this._state = READABLE;
this._rejectFns.delete(reject);
resolve();
};

this._rejectFns.add(reject);
this._stream.once('readable', onReadable);
this._stream['once']('readable', onReadable);
});
}

private _waitEnd(): Promise<void> {
return new Promise<void>((resolve, reject) => {
const onEnd = () => {
this._state = StreamState.Ended;
this._state = ENDED;
this._rejectFns.delete(reject);
resolve();
};

this._rejectFns.add(reject);
this._stream.once('end', onEnd);
this._stream['once']('end', onEnd);
});
}
}
Expand Down

0 comments on commit 745d763

Please sign in to comment.