Skip to content

Commit

Permalink
stream: simplify Transform stream implementation
Browse files Browse the repository at this point in the history
Significantly simplified Transform stream implementation by
using mostly standard stream code.
  • Loading branch information
ronag committed Apr 10, 2020
1 parent aeb7084 commit 6099382
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 291 deletions.
158 changes: 50 additions & 108 deletions lib/_stream_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,66 +65,32 @@

const {
ObjectSetPrototypeOf,
Symbol
} = primordials;

module.exports = Transform;
const {
ERR_METHOD_NOT_IMPLEMENTED,
ERR_MULTIPLE_CALLBACK,
ERR_TRANSFORM_ALREADY_TRANSFORMING,
ERR_TRANSFORM_WITH_LENGTH_0
ERR_METHOD_NOT_IMPLEMENTED
} = require('internal/errors').codes;
const Duplex = require('_stream_duplex');
ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
ObjectSetPrototypeOf(Transform, Duplex);


function afterTransform(er, data) {
const ts = this._transformState;
ts.transforming = false;

const cb = ts.writecb;

if (cb === null) {
return this.emit('error', new ERR_MULTIPLE_CALLBACK());
}

ts.writechunk = null;
ts.writecb = null;

if (data != null) // Single equals check for both `null` and `undefined`
this.push(data);

cb(er);

const rs = this._readableState;
rs.reading = false;
if (rs.needReadable || rs.length < rs.highWaterMark) {
this._read(rs.highWaterMark);
}
}

const kCallback = Symbol('kCallback');

function Transform(options) {
if (!(this instanceof Transform))
return new Transform(options);

Duplex.call(this, options);

this._transformState = {
afterTransform: afterTransform.bind(this),
needTransform: false,
transforming: false,
writecb: null,
writechunk: null,
writeencoding: null
};

// We have implemented the _read method, and done the other things
// that Readable wants before the first _read call, so unset the
// sync guard flag.
this._readableState.sync = false;

this[kCallback] = null;

if (options) {
if (typeof options.transform === 'function')
this._transform = options.transform;
Expand All @@ -133,89 +99,65 @@ function Transform(options) {
this._flush = options.flush;
}

// When the writable side finishes, then flush out anything remaining.
// TODO(ronag): Unfortunately _final is invoked asynchronously.
// Use `prefinish` hack. `prefinish` is emitted synchronously when
// and only when `_final` is not defined. Implementing `_final`
// to a Transform should be an error.
this.on('prefinish', prefinish);
}

function prefinish() {
if (typeof this._flush === 'function' && !this._readableState.destroyed) {
if (typeof this._flush === 'function' && !this.destroyed) {
this._flush((er, data) => {
done(this, er, data);
if (er) {
this.destroy(er);
return;
}

if (data != null) {
this.push(data);
}
this.push(null);
});
} else {
done(this, null, null);
this.push(null);
}
}

Transform.prototype.push = function(chunk, encoding) {
this._transformState.needTransform = false;
return Duplex.prototype.push.call(this, chunk, encoding);
};

// This is the part where you do stuff!
// override this function in implementation classes.
// 'chunk' is an input chunk.
//
// Call `push(newChunk)` to pass along transformed output
// to the readable side. You may call 'push' zero or more times.
//
// Call `cb(err)` when you are done with this chunk. If you pass
// an error, then that'll put the hurt on the whole operation. If you
// never call cb(), then you'll never get another chunk.
Transform.prototype._transform = function(chunk, encoding, cb) {
Transform.prototype._transform = function(chunk, encoding, callback) {
throw new ERR_METHOD_NOT_IMPLEMENTED('_transform()');
};

Transform.prototype._write = function(chunk, encoding, cb) {
const ts = this._transformState;
ts.writecb = cb;
ts.writechunk = chunk;
ts.writeencoding = encoding;
if (!ts.transforming) {
const rs = this._readableState;
if (ts.needTransform ||
rs.needReadable ||
rs.length < rs.highWaterMark)
this._read(rs.highWaterMark);
}
Transform.prototype._write = function(chunk, encoding, callback) {
const rState = this._readableState;
const length = rState.length;

this._transform(chunk, encoding, (err, val) => {
if (err) {
callback(err);
return;
}

if (val != null) {
this.push(val);
}

if (
length === rState.length ||
rState.length < rState.highWaterMark ||
rState.length === 0
) {
callback();
} else {
this[kCallback] = callback;
}
});
};

// Doesn't matter what the args are here.
// _transform does all the work.
// That we got here means that the readable side wants more data.
Transform.prototype._read = function(n) {
const ts = this._transformState;

if (ts.writechunk !== null && !ts.transforming) {
ts.transforming = true;
this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform);
} else {
// Mark that we need a transform, so that any data that comes in
// will get processed, now that we've asked for it.
ts.needTransform = true;
Transform.prototype._read = function() {
if (this[kCallback]) {
const callback = this[kCallback];
this[kCallback] = null;
callback();
}
};


Transform.prototype._destroy = function(err, cb) {
Duplex.prototype._destroy.call(this, err, (err2) => {
cb(err2);
});
};


function done(stream, er, data) {
if (er)
return stream.emit('error', er);

if (data != null) // Single equals check for both `null` and `undefined`
stream.push(data);

// These two error cases are coherence checks that can likely not be tested.
if (stream._writableState.length)
throw new ERR_TRANSFORM_WITH_LENGTH_0();

if (stream._transformState.transforming)
throw new ERR_TRANSFORM_ALREADY_TRANSFORMING();
return stream.push(null);
}
4 changes: 0 additions & 4 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1363,12 +1363,8 @@ E('ERR_TLS_SNI_FROM_SERVER',
E('ERR_TRACE_EVENTS_CATEGORY_REQUIRED',
'At least one category is required', TypeError);
E('ERR_TRACE_EVENTS_UNAVAILABLE', 'Trace events are unavailable', Error);
E('ERR_TRANSFORM_ALREADY_TRANSFORMING',
'Calling transform done when still transforming', Error);

// This should probably be a `RangeError`.
E('ERR_TRANSFORM_WITH_LENGTH_0',
'Calling transform done when writableState.length != 0', Error);
E('ERR_TTY_INIT_FAILED', 'TTY initialization failed', SystemError);
E('ERR_UNCAUGHT_EXCEPTION_CAPTURE_ALREADY_SET',
'`process.setupUncaughtExceptionCapture()` was called while a capture ' +
Expand Down
Loading

0 comments on commit 6099382

Please sign in to comment.