-
Notifications
You must be signed in to change notification settings - Fork 29.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: add final method #12828
stream: add final method #12828
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,6 +58,12 @@ function WritableState(options, stream) { | |
// cast to ints. | ||
this.highWaterMark = Math.floor(this.highWaterMark); | ||
|
||
// if _final has been called | ||
this.finalCalled = false; | ||
|
||
// if _final has been called | ||
this.finalCalled = false; | ||
|
||
// drain event flag. | ||
this.needDrain = false; | ||
// at the start of calling end() | ||
|
@@ -199,6 +205,9 @@ function Writable(options) { | |
|
||
if (typeof options.destroy === 'function') | ||
this._destroy = options.destroy; | ||
|
||
if (typeof options.final === 'function') | ||
this._final = options.final; | ||
} | ||
|
||
Stream.call(this); | ||
|
@@ -520,23 +529,37 @@ function needFinish(state) { | |
!state.finished && | ||
!state.writing); | ||
} | ||
|
||
function prefinish(stream, state) { | ||
if (!state.prefinished) { | ||
function callFinal(stream, state) { | ||
stream._final((err) => { | ||
state.pendingcb--; | ||
if (err) { | ||
stream.emit('error', err); | ||
} | ||
state.prefinished = true; | ||
stream.emit('prefinish'); | ||
finishMaybe(stream, state); | ||
}); | ||
} | ||
function prefinish(stream, state) { | ||
if (!state.prefinished && !state.finalCalled) { | ||
if (typeof stream._final === 'function') { | ||
state.pendingcb++; | ||
state.finalCalled = true; | ||
process.nextTick(callFinal, stream, state); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm... Is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @szmarczak This looks like a classic case where There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know what There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, I found some other cases that require moving it to next tick. 👍 |
||
} else { | ||
state.prefinished = true; | ||
stream.emit('prefinish'); | ||
} | ||
} | ||
} | ||
|
||
function finishMaybe(stream, state) { | ||
var need = needFinish(state); | ||
if (need) { | ||
prefinish(stream, state); | ||
if (state.pendingcb === 0) { | ||
prefinish(stream, state); | ||
state.finished = true; | ||
stream.emit('finish'); | ||
} else { | ||
prefinish(stream, state); | ||
} | ||
} | ||
return need; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,19 +1,11 @@ | ||
'use strict'; | ||
require('../common'); | ||
const assert = require('assert'); | ||
const common = require('../common'); | ||
|
||
const Readable = require('stream').Readable; | ||
|
||
let _readCalled = false; | ||
function _read(n) { | ||
_readCalled = true; | ||
const _read = common.mustCall(function _read(n) { | ||
this.push(null); | ||
} | ||
}); | ||
|
||
const r = new Readable({ read: _read }); | ||
r.resume(); | ||
|
||
process.on('exit', function() { | ||
assert.strictEqual(r._read, _read); | ||
assert(_readCalled); | ||
}); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,24 +1,25 @@ | ||
'use strict'; | ||
require('../common'); | ||
const common = require('../common'); | ||
const assert = require('assert'); | ||
|
||
const Transform = require('stream').Transform; | ||
|
||
let _transformCalled = false; | ||
function _transform(d, e, n) { | ||
_transformCalled = true; | ||
const _transform = common.mustCall(function _transform(d, e, n) { | ||
n(); | ||
} | ||
}); | ||
|
||
let _flushCalled = false; | ||
function _flush(n) { | ||
_flushCalled = true; | ||
const _final = common.mustCall(function _final(n) { | ||
n(); | ||
} | ||
}); | ||
|
||
const _flush = common.mustCall(function _flush(n) { | ||
n(); | ||
}); | ||
|
||
const t = new Transform({ | ||
transform: _transform, | ||
flush: _flush | ||
flush: _flush, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should use |
||
final: _final | ||
}); | ||
|
||
const t2 = new Transform({}); | ||
|
@@ -34,6 +35,5 @@ assert.throws(() => { | |
process.on('exit', () => { | ||
assert.strictEqual(t._transform, _transform); | ||
assert.strictEqual(t._flush, _flush); | ||
assert.strictEqual(_transformCalled, true); | ||
assert.strictEqual(_flushCalled, true); | ||
assert.strictEqual(t._final, _final); | ||
}); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
'use strict'; | ||
const common = require('../common'); | ||
const assert = require('assert'); | ||
|
||
const stream = require('stream'); | ||
let state = 0; | ||
|
||
/* | ||
What you do | ||
var stream = new tream.Transform({ | ||
transform: function transformCallback(chunk, _, next) { | ||
// part 1 | ||
this.push(chunk); | ||
//part 2 | ||
next(); | ||
}, | ||
final: function endCallback(done) { | ||
// part 1 | ||
process.nextTick(function () { | ||
// part 2 | ||
done(); | ||
}); | ||
}, | ||
flush: function flushCallback(done) { | ||
// part 1 | ||
process.nextTick(function () { | ||
// part 2 | ||
done(); | ||
}); | ||
} | ||
}); | ||
t.on('data', dataListener); | ||
t.on('end', endListener); | ||
t.on('finish', finishListener); | ||
t.write(1); | ||
t.write(4); | ||
t.end(7, endMethodCallback); | ||
|
||
The order things are called | ||
|
||
1. transformCallback part 1 | ||
2. dataListener | ||
3. transformCallback part 2 | ||
4. transformCallback part 1 | ||
5. dataListener | ||
6. transformCallback part 2 | ||
7. transformCallback part 1 | ||
8. dataListener | ||
9. transformCallback part 2 | ||
10. finalCallback part 1 | ||
11. finalCallback part 2 | ||
12. flushCallback part 1 | ||
13. finishListener | ||
14. endMethodCallback | ||
15. flushCallback part 2 | ||
16. endListener | ||
*/ | ||
|
||
const t = new stream.Transform({ | ||
objectMode: true, | ||
transform: common.mustCall(function(chunk, _, next) { | ||
assert.strictEqual(++state, chunk, 'transformCallback part 1'); | ||
this.push(state); | ||
assert.strictEqual(++state, chunk + 2, 'transformCallback part 2'); | ||
process.nextTick(next); | ||
}, 3), | ||
final: common.mustCall(function(done) { | ||
state++; | ||
assert.strictEqual(state, 10, 'finalCallback part 1'); | ||
state++; | ||
assert.strictEqual(state, 11, 'finalCallback part 2'); | ||
done(); | ||
}, 1), | ||
flush: common.mustCall(function(done) { | ||
state++; | ||
assert.strictEqual(state, 12, 'flushCallback part 1'); | ||
process.nextTick(function() { | ||
state++; | ||
assert.strictEqual(state, 15, 'flushCallback part 2'); | ||
done(); | ||
}); | ||
}, 1) | ||
}); | ||
t.on('finish', common.mustCall(function() { | ||
state++; | ||
assert.strictEqual(state, 13, 'finishListener'); | ||
}, 1)); | ||
t.on('end', common.mustCall(function() { | ||
state++; | ||
assert.strictEqual(state, 16, 'end event'); | ||
}, 1)); | ||
t.on('data', common.mustCall(function(d) { | ||
assert.strictEqual(++state, d + 1, 'dataListener'); | ||
}, 3)); | ||
t.write(1); | ||
t.write(4); | ||
t.end(7, common.mustCall(function() { | ||
state++; | ||
assert.strictEqual(state, 14, 'endMethodCallback'); | ||
}, 1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you write it twice? Hmm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very likely a mistake. This has already been fixed.
https://github.com/nodejs/node/blob/master/lib/_stream_writable.js#L61-L63