Skip to content

Commit

Permalink
stream: simplify Transform stream implementation
Browse files Browse the repository at this point in the history
Significantly simplified Transform stream implementation by
using mostly standard stream code.

PR-URL: #32763
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
ronag committed Apr 15, 2020
1 parent f22a9ca commit 0bd5595
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 114 deletions.
160 changes: 52 additions & 108 deletions lib/_stream_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,66 +65,32 @@

const {
ObjectSetPrototypeOf,
Symbol
} = primordials;

module.exports = Transform;
const {
ERR_METHOD_NOT_IMPLEMENTED,
ERR_MULTIPLE_CALLBACK,
ERR_TRANSFORM_ALREADY_TRANSFORMING,
ERR_TRANSFORM_WITH_LENGTH_0
ERR_METHOD_NOT_IMPLEMENTED
} = require('internal/errors').codes;
const Duplex = require('_stream_duplex');
ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
ObjectSetPrototypeOf(Transform, Duplex);


function afterTransform(er, data) {
const ts = this._transformState;
ts.transforming = false;

const cb = ts.writecb;

if (cb === null) {
return this.emit('error', new ERR_MULTIPLE_CALLBACK());
}

ts.writechunk = null;
ts.writecb = null;

if (data != null) // Single equals check for both `null` and `undefined`
this.push(data);

cb(er);

const rs = this._readableState;
rs.reading = false;
if (rs.needReadable || rs.length < rs.highWaterMark) {
this._read(rs.highWaterMark);
}
}

const kCallback = Symbol('kCallback');

function Transform(options) {
if (!(this instanceof Transform))
return new Transform(options);

Duplex.call(this, options);

this._transformState = {
afterTransform: afterTransform.bind(this),
needTransform: false,
transforming: false,
writecb: null,
writechunk: null,
writeencoding: null
};

// We have implemented the _read method, and done the other things
// that Readable wants before the first _read call, so unset the
// sync guard flag.
this._readableState.sync = false;

this[kCallback] = null;

if (options) {
if (typeof options.transform === 'function')
this._transform = options.transform;
Expand All @@ -133,89 +99,67 @@ function Transform(options) {
this._flush = options.flush;
}

// When the writable side finishes, then flush out anything remaining.
// TODO(ronag): Unfortunately _final is invoked asynchronously.
// Use `prefinish` hack. `prefinish` is emitted synchronously when
// and only when `_final` is not defined. Implementing `_final`
// to a Transform should be an error.
this.on('prefinish', prefinish);
}

function prefinish() {
if (typeof this._flush === 'function' && !this._readableState.destroyed) {
if (typeof this._flush === 'function' && !this.destroyed) {
this._flush((er, data) => {
done(this, er, data);
if (er) {
this.destroy(er);
return;
}

if (data != null) {
this.push(data);
}
this.push(null);
});
} else {
done(this, null, null);
this.push(null);
}
}

Transform.prototype.push = function(chunk, encoding) {
this._transformState.needTransform = false;
return Duplex.prototype.push.call(this, chunk, encoding);
};

// This is the part where you do stuff!
// override this function in implementation classes.
// 'chunk' is an input chunk.
//
// Call `push(newChunk)` to pass along transformed output
// to the readable side. You may call 'push' zero or more times.
//
// Call `cb(err)` when you are done with this chunk. If you pass
// an error, then that'll put the hurt on the whole operation. If you
// never call cb(), then you'll never get another chunk.
Transform.prototype._transform = function(chunk, encoding, cb) {
Transform.prototype._transform = function(chunk, encoding, callback) {
throw new ERR_METHOD_NOT_IMPLEMENTED('_transform()');
};

Transform.prototype._write = function(chunk, encoding, cb) {
const ts = this._transformState;
ts.writecb = cb;
ts.writechunk = chunk;
ts.writeencoding = encoding;
if (!ts.transforming) {
const rs = this._readableState;
if (ts.needTransform ||
rs.needReadable ||
rs.length < rs.highWaterMark)
this._read(rs.highWaterMark);
}
Transform.prototype._write = function(chunk, encoding, callback) {
const rState = this._readableState;
const wState = this._writableState;
const length = rState.length;

this._transform(chunk, encoding, (err, val) => {
if (err) {
callback(err);
return;
}

if (val != null) {
this.push(val);
}

if (
wState.ended || // Backwards compat.
length === rState.length || // Backwards compat.
rState.length < rState.highWaterMark ||
rState.length === 0
) {
callback();
} else {
this[kCallback] = callback;
}
});
};

// Doesn't matter what the args are here.
// _transform does all the work.
// That we got here means that the readable side wants more data.
Transform.prototype._read = function(n) {
const ts = this._transformState;

if (ts.writechunk !== null && !ts.transforming) {
ts.transforming = true;
this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform);
} else {
// Mark that we need a transform, so that any data that comes in
// will get processed, now that we've asked for it.
ts.needTransform = true;
Transform.prototype._read = function() {
if (this[kCallback]) {
const callback = this[kCallback];
this[kCallback] = null;
callback();
}
};


Transform.prototype._destroy = function(err, cb) {
Duplex.prototype._destroy.call(this, err, (err2) => {
cb(err2);
});
};


function done(stream, er, data) {
if (er)
return stream.emit('error', er);

if (data != null) // Single equals check for both `null` and `undefined`
stream.push(data);

// These two error cases are coherence checks that can likely not be tested.
if (stream._writableState.length)
throw new ERR_TRANSFORM_WITH_LENGTH_0();

if (stream._transformState.transforming)
throw new ERR_TRANSFORM_ALREADY_TRANSFORMING();
return stream.push(null);
}
4 changes: 0 additions & 4 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1363,12 +1363,8 @@ E('ERR_TLS_SNI_FROM_SERVER',
E('ERR_TRACE_EVENTS_CATEGORY_REQUIRED',
'At least one category is required', TypeError);
E('ERR_TRACE_EVENTS_UNAVAILABLE', 'Trace events are unavailable', Error);
E('ERR_TRANSFORM_ALREADY_TRANSFORMING',
'Calling transform done when still transforming', Error);

// This should probably be a `RangeError`.
E('ERR_TRANSFORM_WITH_LENGTH_0',
'Calling transform done when writableState.length != 0', Error);
E('ERR_TTY_INIT_FAILED', 'TTY initialization failed', SystemError);
E('ERR_UNCAUGHT_EXCEPTION_CAPTURE_ALREADY_SET',
'`process.setupUncaughtExceptionCapture()` was called while a capture ' +
Expand Down
3 changes: 1 addition & 2 deletions test/parallel/test-stream2-transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ const Transform = require('_stream_transform');

assert.strictEqual(tx.readableLength, 10);
assert.strictEqual(transformed, 10);
assert.strictEqual(tx._transformState.writechunk.length, 5);
assert.deepStrictEqual(tx.writableBuffer.map(function(c) {
return c.chunk.length;
}), [6, 7, 8, 9, 10]);
}), [5, 6, 7, 8, 9, 10]);
}

{
Expand Down
3 changes: 3 additions & 0 deletions test/parallel/test-zlib-flush-drain.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const ws = deflater._writableState;
const beforeFlush = ws.needDrain;
let afterFlush = ws.needDrain;

deflater.on('data', () => {
});

deflater.flush(function(err) {
afterFlush = ws.needDrain;
});
Expand Down

0 comments on commit 0bd5595

Please sign in to comment.