Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: simplify Transform stream implementation #32763

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 52 additions & 108 deletions lib/_stream_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,66 +65,32 @@

const {
ObjectSetPrototypeOf,
Symbol
} = primordials;

module.exports = Transform;
const {
ERR_METHOD_NOT_IMPLEMENTED,
ERR_MULTIPLE_CALLBACK,
ERR_TRANSFORM_ALREADY_TRANSFORMING,
ERR_TRANSFORM_WITH_LENGTH_0
ERR_METHOD_NOT_IMPLEMENTED
} = require('internal/errors').codes;
const Duplex = require('_stream_duplex');
ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
ObjectSetPrototypeOf(Transform, Duplex);


function afterTransform(er, data) {
const ts = this._transformState;
ts.transforming = false;

const cb = ts.writecb;

if (cb === null) {
return this.emit('error', new ERR_MULTIPLE_CALLBACK());
}

ts.writechunk = null;
ts.writecb = null;

if (data != null) // Single equals check for both `null` and `undefined`
this.push(data);

cb(er);

const rs = this._readableState;
rs.reading = false;
if (rs.needReadable || rs.length < rs.highWaterMark) {
this._read(rs.highWaterMark);
}
}

const kCallback = Symbol('kCallback');

function Transform(options) {
if (!(this instanceof Transform))
return new Transform(options);

Duplex.call(this, options);

this._transformState = {
afterTransform: afterTransform.bind(this),
needTransform: false,
transforming: false,
writecb: null,
writechunk: null,
writeencoding: null
};

// We have implemented the _read method, and done the other things
// that Readable wants before the first _read call, so unset the
// sync guard flag.
this._readableState.sync = false;

this[kCallback] = null;

if (options) {
if (typeof options.transform === 'function')
this._transform = options.transform;
Expand All @@ -133,89 +99,67 @@ function Transform(options) {
this._flush = options.flush;
}

// When the writable side finishes, then flush out anything remaining.
// TODO(ronag): Unfortunately _final is invoked asynchronously.
// Use `prefinish` hack. `prefinish` is emitted synchronously when
// and only when `_final` is not defined. Implementing `_final`
// to a Transform should be an error.
ronag marked this conversation as resolved.
Show resolved Hide resolved
this.on('prefinish', prefinish);
}

function prefinish() {
if (typeof this._flush === 'function' && !this._readableState.destroyed) {
if (typeof this._flush === 'function' && !this.destroyed) {
this._flush((er, data) => {
done(this, er, data);
if (er) {
this.destroy(er);
return;
}

if (data != null) {
this.push(data);
}
this.push(null);
});
} else {
done(this, null, null);
this.push(null);
}
}

Transform.prototype.push = function(chunk, encoding) {
this._transformState.needTransform = false;
return Duplex.prototype.push.call(this, chunk, encoding);
};

// This is the part where you do stuff!
// override this function in implementation classes.
// 'chunk' is an input chunk.
//
// Call `push(newChunk)` to pass along transformed output
// to the readable side. You may call 'push' zero or more times.
//
// Call `cb(err)` when you are done with this chunk. If you pass
// an error, then that'll put the hurt on the whole operation. If you
// never call cb(), then you'll never get another chunk.
Transform.prototype._transform = function(chunk, encoding, cb) {
Transform.prototype._transform = function(chunk, encoding, callback) {
throw new ERR_METHOD_NOT_IMPLEMENTED('_transform()');
};

Transform.prototype._write = function(chunk, encoding, cb) {
const ts = this._transformState;
ts.writecb = cb;
ts.writechunk = chunk;
ts.writeencoding = encoding;
if (!ts.transforming) {
const rs = this._readableState;
if (ts.needTransform ||
rs.needReadable ||
rs.length < rs.highWaterMark)
this._read(rs.highWaterMark);
}
Transform.prototype._write = function(chunk, encoding, callback) {
const rState = this._readableState;
const wState = this._writableState;
const length = rState.length;

this._transform(chunk, encoding, (err, val) => {
if (err) {
callback(err);
return;
}

if (val != null) {
this.push(val);
}

if (
wState.ended || // Backwards compat.
length === rState.length || // Backwards compat.
rState.length < rState.highWaterMark ||
rState.length === 0
) {
callback();
} else {
this[kCallback] = callback;
}
});
};

// Doesn't matter what the args are here.
// _transform does all the work.
// That we got here means that the readable side wants more data.
Transform.prototype._read = function(n) {
const ts = this._transformState;

if (ts.writechunk !== null && !ts.transforming) {
ts.transforming = true;
this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform);
} else {
// Mark that we need a transform, so that any data that comes in
// will get processed, now that we've asked for it.
ts.needTransform = true;
Transform.prototype._read = function() {
if (this[kCallback]) {
const callback = this[kCallback];
this[kCallback] = null;
callback();
}
};


Transform.prototype._destroy = function(err, cb) {
Duplex.prototype._destroy.call(this, err, (err2) => {
cb(err2);
});
};


function done(stream, er, data) {
if (er)
return stream.emit('error', er);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emitting 'error' was a bug


if (data != null) // Single equals check for both `null` and `undefined`
stream.push(data);

// These two error cases are coherence checks that can likely not be tested.
if (stream._writableState.length)
throw new ERR_TRANSFORM_WITH_LENGTH_0();

if (stream._transformState.transforming)
throw new ERR_TRANSFORM_ALREADY_TRANSFORMING();
return stream.push(null);
}
4 changes: 0 additions & 4 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1363,12 +1363,8 @@ E('ERR_TLS_SNI_FROM_SERVER',
E('ERR_TRACE_EVENTS_CATEGORY_REQUIRED',
'At least one category is required', TypeError);
E('ERR_TRACE_EVENTS_UNAVAILABLE', 'Trace events are unavailable', Error);
E('ERR_TRANSFORM_ALREADY_TRANSFORMING',
'Calling transform done when still transforming', Error);

// This should probably be a `RangeError`.
E('ERR_TRANSFORM_WITH_LENGTH_0',
'Calling transform done when writableState.length != 0', Error);
E('ERR_TTY_INIT_FAILED', 'TTY initialization failed', SystemError);
E('ERR_UNCAUGHT_EXCEPTION_CAPTURE_ALREADY_SET',
'`process.setupUncaughtExceptionCapture()` was called while a capture ' +
Expand Down
3 changes: 1 addition & 2 deletions test/parallel/test-stream2-transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ const Transform = require('_stream_transform');

assert.strictEqual(tx.readableLength, 10);
assert.strictEqual(transformed, 10);
assert.strictEqual(tx._transformState.writechunk.length, 5);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tests internal stuff that don't exist anymore

assert.deepStrictEqual(tx.writableBuffer.map(function(c) {
return c.chunk.length;
}), [6, 7, 8, 9, 10]);
ronag marked this conversation as resolved.
Show resolved Hide resolved
}), [5, 6, 7, 8, 9, 10]);
}

{
Expand Down
3 changes: 3 additions & 0 deletions test/parallel/test-zlib-flush-drain.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const ws = deflater._writableState;
const beforeFlush = ws.needDrain;
let afterFlush = ws.needDrain;

deflater.on('data', () => {
});
ronag marked this conversation as resolved.
Show resolved Hide resolved

deflater.flush(function(err) {
afterFlush = ws.needDrain;
});
Expand Down