Skip to content

Commit

Permalink
lib: use Writable#_final for 'finish' tracking
Browse files Browse the repository at this point in the history
Instead overriding `emit` method and using `this._realFinish` to detect
unexpected end of data - implement `_final` method and track the logic
there.

The only behavior change from the use of `_final` is that we would no
longer emit `finish` when the input data is terminated early. Instead we
would emit `error` as before and stop the processing. Note that this is
the behavior provided by `stream.Writable`, and it is thus conformant
with the specification.

Additionally, replace uses of private `_events` property with either
straight `emit()` with return value check, or `listenerCount()` in the
situations where there might be an overhead from the constructed event
arguments.

Fix: mscdex#26
  • Loading branch information
indutny authored and indutny-signal committed Mar 2, 2022
1 parent c64ada8 commit 0ab918d
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 53 deletions.
82 changes: 31 additions & 51 deletions lib/Dicer.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,40 +55,6 @@ class Dicer extends Writable {
});
}

emit(ev) {
if (ev !== 'finish' || this._realFinish) {
Writable.prototype.emit.apply(this, arguments);
return;
}

if (this._finished)
return;

process.nextTick(() => {
this.emit('error', new Error('Unexpected end of multipart data'));

if (this._part && !this._ignoreData) {
const type = (this._isPreamble ? 'Preamble' : 'Part');
this._part.emit(
'error',
new Error(`${type} terminated early due to `
+ 'unexpected end of multipart data')
);
this._part.push(null);
process.nextTick(() => {
this._realFinish = true;
this.emit('finish');
this._realFinish = false;
});
return;
}

this._realFinish = true;
this.emit('finish');
this._realFinish = false;
});
}

_write(data, encoding, cb) {
// Ignore unexpected data (e.g. extra trailer data after finished)
if (!this._hparser && !this._bparser)
Expand All @@ -97,9 +63,7 @@ class Dicer extends Writable {
if (this._headerFirst && this._isPreamble) {
if (!this._part) {
this._part = new PartStream(this._partOpts);
if (this._events.preamble)
this.emit('preamble', this._part);
else
if (!this.emit('preamble', this._part))
ignore(this);
}
const r = this._hparser.push(data);
Expand All @@ -123,6 +87,32 @@ class Dicer extends Writable {
cb();
}

_final(cb) {
if (this._finished) {
if (this._parts !== 0) {
this._pause = true;
this._cb = cb;
return;
}

cb();
return;
}

if (this._part && !this._ignoreData) {
const type = (this._isPreamble ? 'Preamble' : 'Part');
this._part.emit(
'error',
new Error(`${type} terminated early due to `
+ 'unexpected end of multipart data')
);
this._part.push(null);
ignore(this);
}

cb(new Error('Unexpected end of multipart data'));
}

reset() {
this._part = undefined;
this._bparser = undefined;
Expand Down Expand Up @@ -154,15 +144,13 @@ function onInfo(isMatch, data, start, end) {
}
}
if (this._dashes === 2) {
if ((start + i) < end && this._events.trailer)
if ((start + i) < end && this.listenerCount('trailer'))
this.emit('trailer', data.slice(start + i, end));
this.reset();
this._finished = true;
// No more parts will be added
if (this._parts === 0) {
this._realFinish = true;
this.emit('finish');
this._realFinish = false;
unpause(this);
}
}
if (this._dashes)
Expand All @@ -176,9 +164,7 @@ function onInfo(isMatch, data, start, end) {
unpause(this);
};
ev = this._isPreamble ? 'preamble' : 'part';
if (this._events[ev])
this.emit(ev, this._part);
else
if (!this.emit(ev, this._part))
ignore(this);
if (!this._isPreamble)
this._inHeader = true;
Expand Down Expand Up @@ -206,13 +192,7 @@ function onInfo(isMatch, data, start, end) {
++this._parts;
this._part.on('end', () => {
if (--this._parts === 0) {
if (this._finished) {
this._realFinish = true;
this.emit('finish');
this._realFinish = false;
} else {
unpause(this);
}
unpause(this);
}
});
}
Expand Down
7 changes: 5 additions & 2 deletions test/test-multipart.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ function next() {
});
}).on('error', (err) => {
error = err;
}).on('finish', () => {
onFinish();
}).on('finish', onFinish);

function onFinish() {
assert(finishes++ === 0, makeMsg(v.what, 'finish emitted multiple times'));

if (v.dicerError) {
Expand Down Expand Up @@ -242,7 +245,7 @@ function next() {
}
++t;
next();
});
}

fs.createReadStream(fixtureBase + '/original').pipe(dicer);
}
Expand Down
36 changes: 36 additions & 0 deletions test/test-pipeline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
'use strict';

const assert = require('assert');
const fs = require('fs');
const path = require('path');
const { inspect } = require('util');
const { Readable } = require('stream');
const { pipeline } = require('stream/promises');
const Dicer = require('..');

async function main() {
const r = new Readable({ read() {} });
const d = new Dicer({ boundary: 'a' });

d.on('part', async (part) => {
part.resume();
});

r.push(`--a\r\nA: 1\r\nB: 1\r\n\r\n123\r\n--a\r\n\r\n456\r\n--a--\r\n`);
setImmediate(() => {
r.push(null);
});

const timer = setTimeout(() => {
throw new Error('Should be canceled');
}, 2000);

await pipeline(r, d);

clearTimeout(timer);
}

main().catch((err) => {
console.error(err);
process.exit(1);
});

0 comments on commit 0ab918d

Please sign in to comment.