Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: Switch to streamx #9

Merged
merged 20 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,25 @@

[![NPM version][npm-image]][npm-url] [![Downloads][downloads-image]][npm-url] [![Build Status][ci-image]][ci-url] [![Coveralls Status][coveralls-image]][coveralls-url]

Wrap a ReadableStream in a TransformStream.
Wrap a `Readable` stream in a `Transform` stream.

## Usage

```js
var from = require('from2');
var { Readable } = require('streamx');
var concat = require('concat-stream');
var toThrough = require('to-through');

var readable = from([' ', 'hello', ' ', 'world']);
var readable = Readable.from([' ', 'hello', ' ', 'world']);

// Can be used as a Readable or Transform
var maybeTransform = toThrough(readable);

from(['hi', ' ', 'there', ','])
Readable.from(['hi', ' ', 'there', ','])
.pipe(maybeTransform)
.pipe(
concat(function (result) {
// result.toString() === 'hi there, hello world'
// result === 'hi there, hello world'
})
);
```
Expand All @@ -35,7 +35,8 @@ from(['hi', ' ', 'there', ','])

### `toThrough(readableStream)`

Takes a `readableStream` as the only argument and returns a `through2` stream. If the returned stream is piped before `nextTick`, the wrapped `readableStream` will not flow until the upstream is flushed. If the stream is not piped before `nextTick`, it is ended and flushed (acting as a proper readable).
Takes a `Readable` stream as the only argument and returns a `Transform` stream wrapper. Any data
piped into the `Transform` stream is piped passed along before any data from the wrapped `Readable` is injected into the stream.

## License

Expand Down
111 changes: 93 additions & 18 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,118 @@
'use strict';

var through = require('through2');
var Transform = require('streamx').Transform;

function forward(chunk, enc, cb) {
cb(null, chunk);
}
// Based on help from @mafintosh via https://gist.github.com/mafintosh/92836a8d03df0ef41356e233e0f06382

function toThrough(readable) {
var opts = {
objectMode: readable._readableState.objectMode,
highWaterMark: readable._readableState.highWaterMark,
};
var highWaterMark = readable._readableState.highWaterMark;

// Streamx uses 16384 as the default highWaterMark for everything and then
// divides it by 1024 for objects
// However, node's objectMode streams the number of objects as highWaterMark, so we need to
// multiply the objectMode highWaterMark by 1024 to make it streamx compatible
if (readable._readableState.objectMode) {
highWaterMark = readable._readableState.highWaterMark * 1024;
}

var destroyedByError = false;
var readableClosed = false;
var readableEnded = false;

function flush(cb) {
var self = this;

readable.on('readable', onReadable);
readable.on('end', cb);
// Afer all writes have drained, we change the `_read` implementation
self._read = function (cb) {
readable.resume();
cb();
};

readable.on('data', onData);
readable.once('error', onError);
readable.once('end', onEnd);

function cleanup() {
readable.off('data', onData);
readable.off('error', onError);
readable.off('end', onEnd);
}

function onReadable() {
var chunk;
while ((chunk = readable.read())) {
self.push(chunk);
function onData(data) {
var drained = self.push(data);
// When the stream is not drained, we pause it because `_read` will be called later
if (!drained) {
readable.pause();
}
}

function onError(err) {
cleanup();
cb(err);
}

function onEnd() {
cleanup();
cb();
}
}

var wrapper = through(opts, forward, flush);
// Handle the case where a user destroyed the returned stream
function predestroy() {
// Only call destroy on the readable if this `predestroy` wasn't
// caused via the readable having an `error` or `close` event
if (destroyedByError) {
return;
}
if (readableClosed) {
return;
}
readable.destroy(new Error('Wrapper destroyed'));
}

var wrapper = new Transform({
highWaterMark: highWaterMark,
flush: flush,
predestroy: predestroy,
});

// Forward errors from the underlying stream
readable.once('error', onError);
readable.once('end', onEnd);
readable.once('close', onClose);

function onError(err) {
destroyedByError = true;
wrapper.destroy(err);
}

function onEnd() {
readableEnded = true;
}

function onClose() {
readableClosed = true;
// Only destroy the wrapper if the readable hasn't ended successfully
if (!readableEnded) {
wrapper.destroy();
}
}

var shouldFlow = true;
wrapper.once('pipe', onPipe);
wrapper.on('piping', onPiping);
wrapper.on('newListener', onListener);
readable.on('error', wrapper.emit.bind(wrapper, 'error'));

function onPiping() {
maybeFlow();
wrapper.off('piping', onPiping);
wrapper.off('newListener', onListener);
}

function onListener(event) {
// Once we've seen the data or readable event, check if we need to flow
if (event === 'data' || event === 'readable') {
maybeFlow();
this.removeListener('newListener', onListener);
onPiping();
}
}

Expand Down
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "to-through",
"version": "2.0.0",
"description": "Wrap a ReadableStream in a TransformStream.",
"description": "Wrap a Readable stream in a Transform stream.",
"author": "Gulp Team <team@gulpjs.com> (https://gulpjs.com/)",
"contributors": [
"Blaine Bublitz <blaine.bublitz@gmail.com>"
Expand All @@ -22,16 +22,16 @@
"test": "nyc mocha --async-only"
},
"dependencies": {
"through2": "^2.0.3"
"streamx": "^2.12.5"
},
"devDependencies": {
"eslint": "^7.32.0",
"eslint-config-gulp": "^5.0.1",
"eslint-plugin-node": "^11.1.0",
"expect": "^27.4.2",
"mississippi": "^4.0.0",
"mocha": "^8.4.0",
"nyc": "^15.1.0"
"nyc": "^15.1.0",
"readable-stream": "^3.6.0"
},
"nyc": {
"reporter": [
Expand Down
Loading