Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[utils/streams] implement some generic stream helpers #10187

Merged
merged 5 commits into from
Feb 11, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/utils/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,14 @@ export deepCloneWithBuffers from './deep_clone_with_buffers';
export fromRoot from './from_root';
export pkg from './package_json';
export unset from './unset';

export {
createConcatStream,
createIntersperseStream,
createJsonParseStream,
createJsonStringifyStream,
createListStream,
createPromiseFromStreams,
createReduceStream,
createSplitStream,
} from './streams';
79 changes: 79 additions & 0 deletions src/utils/streams/__tests__/concat_stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import expect from 'expect.js';

import {
createListStream,
createPromiseFromStreams,
createConcatStream
} from '../';

describe('concatStream', () => {
it('accepts an initial value', async () => {
const output = await createPromiseFromStreams([
createListStream([1,2,3]),
createConcatStream([])
]);

expect(output).to.eql([1,2,3]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't really clear what the empty array input does on the concat. Maybe [0] or something, to make it clearer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe [0] or something, to make it clearer?

Not sure what you mean. You understand what it's doing though?

Copy link
Contributor

@kimjoar kimjoar Feb 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant createConcatStream([0]) or something like that. Won't that end up as [0,1,2,3]? Might make the param a bit "clearer" in what role it has.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, sounds good

});

describe(`combines using the previous value's concat method`, () => {
it('works with strings', async () => {
const output = await createPromiseFromStreams([
createListStream([
'a',
'b',
'c'
]),
createConcatStream()
]);
expect(output).to.eql('abc');
});

it('works with strings', async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numbers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arrays actually

const output = await createPromiseFromStreams([
createListStream([
[1],
[2,3,4],
[10]
]),
createConcatStream()
]);
expect(output).to.eql([1,2,3,4,10]);
});

it('works with a mixture, starting with array', async () => {
const output = await createPromiseFromStreams([
createListStream([
[],
1,
2,
3,
4,
[5,6,7]
]),
createConcatStream()
]);
expect(output).to.eql([1,2,3,4,5,6,7]);
});

it('fails when the value does not have a concat method', async () => {
let promise;
try {
promise = createPromiseFromStreams([
createListStream([1, '1']),
createConcatStream()
]);
} catch (err) {
expect.fail('createPromiseFromStreams() should not fail synchronously');
}

try {
await promise;
expect.fail('Promise should have rejected');
} catch (err) {
expect(err).to.be.an(Error);
expect(err.message).to.contain('concat');
}
});
});
});
38 changes: 38 additions & 0 deletions src/utils/streams/__tests__/intersperse_stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import expect from 'expect.js';
import sinon from 'sinon';

import {
createPromiseFromStreams,
createListStream,
createIntersperseStream,
createConcatStream
} from '../';

describe('intersperseStream', () => {
it('places the intersperse value between each provided value', async () => {
expect(
await createPromiseFromStreams([
createListStream(['to', 'be', 'or', 'not', 'to', 'be']),
createIntersperseStream(' '),
createConcatStream()
])
).to.be('to be or not to be');
});

it('emits the intersperse value right before the second value, does not wait', async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this important?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to ensure that this isn't holding onto chunks it doesn't need to. I'll pick a better title

const str = createIntersperseStream('y');
const stub = sinon.stub();
str.on('data', stub);

str.write('a');
sinon.assert.calledOnce(stub);
expect(stub.firstCall.args).to.eql(['a']);
stub.reset();

str.write('b');
sinon.assert.calledTwice(stub);
expect(stub.firstCall.args).to.eql(['y']);
sinon.assert.calledTwice(stub);
expect(stub.secondCall.args).to.eql(['b']);
});
});
72 changes: 72 additions & 0 deletions src/utils/streams/__tests__/json_parse_stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import expect from 'expect.js';

import {
createPromiseFromStreams,
createListStream,
createConcatStream,
createJsonParseStream
} from '../';

describe('jsonParseStream', () => {
describe('standard usage', () => {
it('parses json strings', async () => {
const str = createJsonParseStream();
const dataPromise = new Promise((resolve, reject) => {
str.on('data', resolve);
str.on('error', reject);
});
str.write('{ "foo": "bar" }');

expect(await dataPromise).to.eql({
foo: 'bar'
});
});

it('parses json value passed to it from a list stream', async () => {
expect(await createPromiseFromStreams([
createListStream([
'"foo"',
'1'
]),
createJsonParseStream(),
createConcatStream([])
]))
.to.eql(['foo', 1]);
});
});

describe('error handling', () => {
it('emits an error when there is a parse failure', async () => {
const str = createJsonParseStream();
const errorPromise = new Promise(resolve => str.once('error', resolve));
str.write('{"partial');
const err = await errorPromise;
expect(err).to.be.an(Error);
expect(err).to.have.property('name', 'SyntaxError');
});

it('continues parsing after an error', async () => {
const str = createJsonParseStream();

const firstEmitPromise = new Promise(resolve => {
str.once('error', v => resolve({ name: 'error', value: v }));
str.once('data', v => resolve({ name: 'data', value: v }));
});

str.write('{"partial');
const firstEmit = await firstEmitPromise;
expect(firstEmit).to.have.property('name', 'error');
expect(firstEmit.value).to.be.an(Error);

const secondEmitPromise = new Promise(resolve => {
str.once('error', v => resolve({ name: 'error', value: v }));
str.once('data', v => resolve({ name: 'data', value: v }));
});

str.write('42');
const secondEmit = await secondEmitPromise;
expect(secondEmit).to.have.property('name', 'data');
expect(secondEmit).to.have.property('value', 42);
});
});
});
79 changes: 79 additions & 0 deletions src/utils/streams/__tests__/json_stringify_stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import expect from 'expect.js';

import {
createPromiseFromStreams,
createListStream,
createConcatStream,
createJsonStringifyStream
} from '../';

function createCircularStructure() {
const obj = {};
obj.obj = obj; // create circular reference
return obj;
}

describe('jsonStringifyStream', () => {
describe('standard usage', () => {
it('stringifys js values', async () => {
const str = createJsonStringifyStream();
const dataPromise = new Promise((resolve, reject) => {
str.on('data', resolve);
str.on('error', reject);
});
str.write({ foo: 'bar' });

expect(await dataPromise).to.be('{"foo":"bar"}');
});

it('stringifys js values passed from a list stream', async () => {
const all = await createPromiseFromStreams([
createListStream([
'foo',
1
]),
createJsonStringifyStream(),
createConcatStream([])
]);

expect(all).to.eql(['"foo"', '1']);
});
});

describe('error handling', () => {
it('emits an error when there is a parse failure', async () => {
const str = createJsonStringifyStream();
const errorPromise = new Promise(resolve => str.once('error', resolve));
str.write(createCircularStructure());
const err = await errorPromise;
expect(err).to.be.an(Error);
expect(err).to.have.property('name', 'TypeError');
expect(err.message).to.contain('circular');
});

it('continues parsing after an error', async () => {
const str = createJsonStringifyStream();

const firstEmitPromise = new Promise(resolve => {
str.once('error', v => resolve({ name: 'error', value: v }));
str.once('data', v => resolve({ name: 'data', value: v }));
});

str.write(createCircularStructure());

const firstEmit = await firstEmitPromise;
expect(firstEmit).to.have.property('name', 'error');
expect(firstEmit.value).to.be.an(Error);

const secondEmitPromise = new Promise(resolve => {
str.once('error', v => resolve({ name: 'error', value: v }));
str.once('data', v => resolve({ name: 'data', value: v }));
});

str.write('foo');
const secondEmit = await secondEmitPromise;
expect(secondEmit).to.have.property('name', 'data');
expect(secondEmit).to.have.property('value', '"foo"');
});
});
});
28 changes: 28 additions & 0 deletions src/utils/streams/__tests__/list_stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import expect from 'expect.js';
import sinon from 'sinon';

import { createListStream } from '../';

describe('listStream', () => {
it('provides the values in the initial list', async () => {
const str = createListStream([1,2,3,4]);
const stub = sinon.stub();
str.on('data', stub);

await new Promise(resolve => str.on('end', resolve));

sinon.assert.callCount(stub, 4);
expect(stub.getCall(0).args).to.eql([1]);
expect(stub.getCall(1).args).to.eql([2]);
expect(stub.getCall(2).args).to.eql([3]);
expect(stub.getCall(3).args).to.eql([4]);
});

it('does not modify the list passed', async () => {
const list = [1,2,3,4];
const str = createListStream(list);
str.resume();
await new Promise(resolve => str.on('end', resolve));
expect(list).to.eql([1,2,3,4]);
});
});
Loading