Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add _end method to write streams (equivilent to _flush) #2314

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions doc/api/stream.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -1139,8 +1139,9 @@ initialized.
#### Events: 'finish' and 'end'

The [`'finish'`][] and [`'end'`][] events are from the parent Writable
and Readable classes respectively. The `'finish'` event is fired after
[`stream.end()`][stream-end] is called and all chunks have been processed by
and Readable classes respectively. The `'finish'` event is fired after the
callback in `_end` has been called which is after [`stream.end()`][stream-end]
is called and all chunks have been processed by
[`stream._transform()`][stream-_transform], `'end'` is fired after all data has
been output which is after the callback in [`stream._flush()`][stream-_flush]
has been called.
Expand Down Expand Up @@ -1382,6 +1383,18 @@ This function is completely optional to implement. In most cases it is
unnecessary. If implemented, it will be called with all the chunks
that are buffered in the write queue.

#### writable.\_end(callback)

* `callback` {Function} Call this function (optionally with an error
argument) when you are done writing any remaining data.

Note: **This function MUST NOT be called directly.** It MAY be implemented
by child classes, and if so, will be called by the internal Writable
class methods only.

When the stream ends this function will be called before the stream closes,
useful if you need to close a resource or write some data that you had buffered.
This function is completely optional to implement.

## Simplified Constructor API

Expand Down
32 changes: 25 additions & 7 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ function WritableState(options, stream) {
this.ended = false;
// when 'finish' is emitted
this.finished = false;

// if _end has been called
this.endCalled = false;
// should we decode strings into buffers before passing to _write?
// this is here so that some node-core streams can optimize string
// handling at a lower level.
Expand Down Expand Up @@ -152,6 +153,9 @@ function Writable(options) {

if (typeof options.writev === 'function')
this._writev = options.writev;

if (typeof options.end === 'function')
this._end = options.end;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a Symbol here instead of this._end?

The thought is that we can encourage new-style constructor use while also avoiding overloading a property that might be in use in the wild.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could but

  • it would be inconsistent with how the other methods work
  • after making the change the next thing I'd have to do is make a regex to undo it for readable-stream

}

Stream.call(this);
Expand Down Expand Up @@ -463,21 +467,35 @@ function needFinish(state) {
}

function prefinish(stream, state) {
if (!state.prefinished) {
state.prefinished = true;
stream.emit('prefinish');
if (!state.prefinished && !state.endCalled) {
if (typeof stream._end === 'function') {
state.pendingcb++;
state.endCalled = true;
process.nextTick(() =>
stream._end(function(err) {
state.pendingcb--;
if (err) {
stream.emit('error', err);
}
state.prefinished = true;
stream.emit('prefinish');
finishMaybe(stream, state);
})
);
} else {
state.prefinished = true;
stream.emit('prefinish');
}
}
}

function finishMaybe(stream, state) {
var need = needFinish(state);
if (need) {
prefinish(stream, state);
if (state.pendingcb === 0) {
prefinish(stream, state);
state.finished = true;
stream.emit('finish');
} else {
prefinish(stream, state);
}
}
return need;
Expand Down
12 changes: 6 additions & 6 deletions test/parallel/test-stream-transform-constructor-set-methods.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@ function _transform(d, e, n) {
n();
}

var _flushCalled = false;
function _flush(n) {
_flushCalled = true;
var _endCalled = false;
function _end(n) {
_endCalled = true;
n();
}

var t = new Transform({
transform: _transform,
flush: _flush
end: _end
});

t.end(Buffer.from('blerg'));
t.resume();

process.on('exit', function() {
assert.equal(t._transform, _transform);
assert.equal(t._flush, _flush);
assert.equal(t._end, _end);
assert(_transformCalled);
assert(_flushCalled);
assert(_endCalled);
});
102 changes: 102 additions & 0 deletions test/parallel/test-stream-transform-end.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
'use strict';
require('../common');
var assert = require('assert');

var stream = require('stream');
var state = 0;

/*
What you do
var stream = new tream.Transform({
transform: function transformCallback(chunk, _, next) {
// part 1
this.push(chunk);
//part 2
next();
},
end: function endCallback(done) {
// part 1
process.nextTick(function () {
// part 2
done();
});
},
flush: function flushCallback(done) {
// part 1
process.nextTick(function () {
// part 2
done();
});
}
});
t.on('data', dataListener);
t.on('end', endListener);
t.on('finish', finishListener);
t.write(1);
t.write(4);
t.end(7, endMethodCallback);

The order things are called

1. transformCallback part 1
2. dataListener
3. transformCallback part 2
4. transformCallback part 1
5. dataListener
6. transformCallback part 2
7. transformCallback part 1
8. dataListener
9. transformCallback part 2
10. endCallback part 1
11. endCallback part 2
12. flushCallback part 1
13. finishListener
14. endMethodCallback
15. flushCallback part 2
16. endListener
*/

var t = new stream.Transform({
objectMode: true,
transform: function(chunk, _, next) {
assert.equal(++state, chunk, 'transformCallback part 1');
this.push(state);
assert.equal(++state, chunk + 2, 'transformCallback part 2');
process.nextTick(next);
},
end: function(done) {
state++;
assert.equal(state, 10, 'endCallback part 1');
setTimeout(function() {
state++;
assert.equal(state, 11, 'endCallback part 2');
done();
}, 100);
},
flush: function(done) {
state++;
assert.equal(state, 12, 'flushCallback part 1');
process.nextTick(function() {
state++;
assert.equal(state, 15, 'flushCallback part 2');
done();
});
}
});
t.on('finish', function() {
state++;
assert.equal(state, 13, 'finishListener');
});
t.on('end', function() {
state++;
assert.equal(state, 16, 'end event');
});
t.on('data', function(d) {
assert.equal(++state, d + 1, 'dataListener');
});
t.write(1);
t.write(4);
t.end(7, function() {
state++;
assert.equal(state, 14, 'endMethodCallback');
});
23 changes: 23 additions & 0 deletions test/parallel/test-stream-write-end.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
'use strict';
require('../common');
var assert = require('assert');

var stream = require('stream');
var w = new stream.Writable({
end: function(cb) {
assert(this === w);
setTimeout(function() {
shutdown = true;
cb();
}, 100);
},
write: function(chunk, e, cb) {
process.nextTick(cb);
}
});
var shutdown = false;
w.on('finish', function() {
assert(shutdown);
});
w.write(Buffer(1));
w.end(Buffer(0));
Loading